abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/657
Change subject: Introduce NC to NC Messaging
......................................................................
Introduce NC to NC Messaging
This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.
Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
M
asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M
asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A
asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
A asterix-app/src/main/resources/cluster.xml
M
asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M
asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M
asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M
asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M
asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
M
asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M
asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-external-data/pom.xml
A
asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M
asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M
asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
M
asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
33 files changed, 617 insertions(+), 59 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/57/657/1
diff --git
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e33aed2..e3493c6 100644
---
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -87,7 +87,7 @@
if (tempPath.endsWith(File.separator)) {
tempPath = tempPath.substring(0, tempPath.length() - 1);
}
- //get initial partitions from properties
+ // get initial partitions from properties
String[] nodeStores = propertiesAccessor.getStores().get(ncName);
if (nodeStores == null) {
throw new Exception("Coudn't find stores for NC: " + ncName);
@@ -97,7 +97,7 @@
tempDirPath += File.separator;
}
for (int p = 0; p < nodeStores.length; p++) {
- //create IO devices based on stores
+ // create IO devices based on stores
String iodevicePath = tempDirPath + ncConfig1.nodeId +
File.separator + nodeStores[p];
File ioDeviceDir = new File(iodevicePath);
ioDeviceDir.mkdirs();
diff --git
a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4922ae6..58523f4 100644
---
a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++
b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -122,7 +122,7 @@
if (replicationEnabled) {
if (systemState == SystemState.NEW_UNIVERSE || systemState ==
SystemState.CORRUPTED) {
- //Try to perform remote recovery
+ // Try to perform remote recovery
IRemoteRecoveryManager remoteRecoveryMgr =
runtimeContext.getRemoteRecoveryManager();
if (autoFailover) {
remoteRecoveryMgr.startFailbackProcess();
@@ -153,13 +153,13 @@
}
private void startReplicationService() {
- //Open replication channel
+ // Open replication channel
runtimeContext.getReplicationChannel().start();
- //Check the state of remote replicas
+ // Check the state of remote replicas
runtimeContext.getReplicationManager().initializeReplicasState();
- //Start replication after the state of remote replicas has been
initialized.
+ // Start replication after the state of remote replicas has been
initialized.
runtimeContext.getReplicationManager().startReplicationThreads();
}
@@ -176,10 +176,10 @@
MetadataBootstrap.stopUniverse();
}
- //Clean any temporary files
+ // Clean any temporary files
performLocalCleanUp();
- //Note: stopping recovery manager will make a sharp checkpoint
+ // Note: stopping recovery manager will make a sharp checkpoint
ncApplicationContext.getLifeCycleComponentManager().stopAll(false);
runtimeContext.deinitialize();
} else {
@@ -191,7 +191,7 @@
@Override
public void notifyStartupComplete() throws Exception {
- //Send max resource id on this NC to the CC
+ // Send max resource id on this NC to the CC
((INCMessageBroker)
ncApplicationContext.getMessageBroker()).reportMaxResourceId();
AsterixMetadataProperties metadataProperties =
((IAsterixPropertiesProvider) runtimeContext)
@@ -245,15 +245,15 @@
}
}
- //Clean any temporary files
+ // Clean any temporary files
performLocalCleanUp();
}
private void performLocalCleanUp() {
- //Delete working area files from failed jobs
+ // Delete working area files from failed jobs
runtimeContext.getIOManager().deleteWorkspaceFiles();
- //Reclaim storage for temporary datasets.
+ // Reclaim storage for temporary datasets.
String storageDirName =
AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
String[] ioDevices = ((PersistentLocalResourceRepository)
runtimeContext.getLocalResourceRepository())
.getStorageMountingPoints();
@@ -263,9 +263,9 @@
FileUtils.deleteQuietly(new File(tempDatasetsDir));
}
- //TODO
- //Reclaim storage for orphaned index artifacts in NCs.
- //Note: currently LSM indexes invalid components are deleted when an
index is activated.
+ // TODO
+ // Reclaim storage for orphaned index artifacts in NCs.
+ // Note: currently LSM indexes invalid components are deleted when an
index is activated.
}
private void updateOnNodeJoin() {
diff --git
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917..de3848e 100644
---
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -24,7 +24,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.api.common.AsterixAppRuntimeContext;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
@@ -43,6 +43,7 @@
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -54,16 +55,21 @@
private final NodeControllerService ncs;
private final AtomicLong messageId = new AtomicLong(0);
private final Map<Long, IApplicationMessageCallback> callbacks;
- private final IAsterixAppRuntimeContext appContext;
+ private final NCMessagingServer messageServer;
+
+ private final AsterixAppRuntimeContext appContext;
public NCMessageBroker(NodeControllerService ncs) {
this.ncs = ncs;
- appContext = (IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+ // Start listening to NC messages
+ appContext = (AsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
+ messageServer = new
NCMessagingServer(appContext.getMetadataProperties().getNodeMessagingPort(ncs.getId()));
+ messageServer.start();
}
@Override
- public void sendMessage(IApplicationMessage message,
IApplicationMessageCallback callback) throws Exception {
+ public void sendMessageToCC(IApplicationMessage message,
IApplicationMessageCallback callback) throws Exception {
if (callback != null) {
long uniqueMessageId = messageId.incrementAndGet();
message.setId(uniqueMessageId);
@@ -73,7 +79,7 @@
ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
} catch (Exception e) {
if (callback != null) {
- //remove the callback in case of failure
+ // remove the callback in case of failure
callbacks.remove(message.getId());
}
throw e;
@@ -131,7 +137,7 @@
//send response after takeover is completed
TakeoverPartitionsResponseMessage reponse = new
TakeoverPartitionsResponseMessage(msg.getRequestId(),
appContext.getTransactionSubsystem().getId(),
msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
}
@@ -142,18 +148,18 @@
} finally {
TakeoverMetadataNodeResponseMessage reponse = new
TakeoverMetadataNodeResponseMessage(
appContext.getTransactionSubsystem().getId());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
}
@Override
public void reportMaxResourceId() throws Exception {
ReportMaxResourceIdMessage maxResourceIdMsg = new
ReportMaxResourceIdMessage();
- //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for
metadata indexes.
+ // resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for
metadata indexes.
long maxResourceId =
Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
maxResourceIdMsg.setMaxResourceId(maxResourceId);
- sendMessage(maxResourceIdMsg, null);
+ sendMessageToCC(maxResourceIdMsg, null);
}
private void handleReplicaEvent(IMessage message) {
@@ -195,7 +201,7 @@
//send response after partitions prepared for failback
PreparePartitionsFailbackResponseMessage reponse = new
PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
msg.getRequestId(), msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
private void handleCompleteFailbackRequest(IMessage message) throws
Exception {
@@ -206,7 +212,15 @@
} finally {
CompleteFailbackResponseMessage reponse = new
CompleteFailbackResponseMessage(msg.getPlanId(),
msg.getRequestId(), msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
}
+
+ @Override
+ public void sendMessageToNC(String to, IApplicationMessage message,
IApplicationMessageCallback callback)
+ throws Exception {
+ NCMessagingClient client = new
NCMessagingClient(AsterixClusterProperties.INSTANCE.getNodeIpAddress(to),
+
appContext.getMetadataProperties().getNodeMessagingPort(ncs.getId()), 4);
+ client.start();
+ }
}
diff --git
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
new file mode 100644
index 0000000..e34a0f3
--- /dev/null
+++
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
+public class NCMessageDecoder extends ReplayingDecoder<Void> {
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
+ System.err.println("NCMessageDecoder: decode called");
+ out.add(in.readBytes(4));
+ }
+}
diff --git
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
new file mode 100644
index 0000000..63eb888
--- /dev/null
+++
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import javax.net.ssl.SSLException;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public class NCMessagingClient {
+ private final String host;
+ private final int port;
+ private final int size;
+
+ public NCMessagingClient(String host, int port, int size) {
+ this.host = host;
+ this.port = port;
+ this.size = size;
+ }
+
+ public void start() throws SSLException, InterruptedException {
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ Bootstrap b = new Bootstrap();
+ b.group(group).channel(NioSocketChannel.class).handler(new
ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new NCMessagingClientHandler(size));
+ }
+ });
+
+ // Make the connection attempt.
+ ChannelFuture f = b.connect(host, port).sync();
+
+ // Wait until the connection is closed.
+ f.channel().closeFuture().sync();
+ } finally {
+ group.shutdownGracefully();
+ }
+ }
+}
diff --git
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
new file mode 100644
index 0000000..40dd153
--- /dev/null
+++
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+public class NCMessagingClientHandler extends
SimpleChannelInboundHandler<Object> {
+
+ private ByteBuf content;
+ private ChannelHandlerContext ctx;
+ private int size;
+
+ public NCMessagingClientHandler(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+
+ // Initialize the message.
+ content = ctx.alloc().directBuffer(size);
+ content.writeInt(10);
+
+ // Send the initial messages.
+ generateTraffic();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ content.release();
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ // Server is supposed to send nothing, but if it sends something,
discard it.
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ // Close the connection when an exception is raised.
+ cause.printStackTrace();
+ ctx.close();
+ }
+
+ long counter;
+
+ private void generateTraffic() {
+ // Flush the outbound buffer to the socket.
+ // Once flushed, generate the same amount of traffic again.
+
ctx.writeAndFlush(content.duplicate().retain()).addListener(trafficGenerator);
+ }
+
+ private final ChannelFutureListener trafficGenerator = new
ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ future.channel().close();
+ } else {
+ future.cause().printStackTrace();
+ future.channel().close();
+ }
+ }
+ };
+
+}
diff --git
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
new file mode 100644
index 0000000..3cb6aaa
--- /dev/null
+++
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class NCMessagingHandler extends ChannelInboundHandlerAdapter {
+ private ByteBuf buf;
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) {
+ buf = ctx.alloc().buffer(4);
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) {
+ buf.release();
+ buf = null;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ System.err.println("channelRead Called");
+ ByteBuf m = (ByteBuf) msg;
+ buf.writeBytes(m);
+ m.release();
+
+ if (buf.readableBytes() >= 4) {
+ System.out.println(buf.readInt());
+ ctx.close();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ ctx.close();
+ }
+}
diff --git
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
new file mode 100644
index 0000000..dbf4c0e
--- /dev/null
+++
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class NCMessagingServer {
+ private int port;
+ private ChannelInboundHandlerAdapter handler = new NCMessagingHandler();
+ private ChannelFuture f;
+
+ public NCMessagingServer(int port) {
+ this.port = port;
+ }
+
+ public void stop() {
+ f.channel().close();
+ }
+
+ public void start() {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup,
workerGroup).channel(NioServerSocketChannel.class)
+ .childHandler(new
ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws
Exception {
+ ch.pipeline().addLast(new NCMessageDecoder(), new
NCMessagingHandler());
+ }
+ }).option(ChannelOption.SO_BACKLOG,
128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ // Bind and start to accept incoming connections.
+ ChannelFuture f = b.bind(port).sync();
+
+ // Wait until the server socket is closed.
+ // In this example, this does not happen, but you can do
that to gracefully
+ // shut down your server.
+ f.channel().closeFuture().sync();
+ } catch (Throwable th) {
+ // Do something
+ th.printStackTrace();
+ } finally {
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }
+ });
+ thread.setName("NCMessagingService:" + port);
+ thread.start();
+ }
+}
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml
b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 731113b..2dd5cfc 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -20,10 +20,12 @@
<metadataNode>asterix_nc1</metadataNode>
<store>
<ncId>asterix_nc1</ncId>
+ <nc_messaging_port>4501</nc_messaging_port>
<storeDirs>iodevice0,iodevice1</storeDirs>
</store>
<store>
<ncId>asterix_nc2</ncId>
+ <nc_messaging_port>4502</nc_messaging_port>
<storeDirs>iodevice0,iodevice1</storeDirs>
</store>
<transactionLogDir>
diff --git a/asterix-app/src/main/resources/cluster.xml
b/asterix-app/src/main/resources/cluster.xml
new file mode 100644
index 0000000..7c7e321
--- /dev/null
+++ b/asterix-app/src/main/resources/cluster.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+ <instance_name>asterix</instance_name>
+ <store>storage</store>
+ <master_node>
+ <id>master</id>
+ <client_ip>127.0.0.1</client_ip>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <nc_messaging_port>4503</nc_messaging_port>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <nc_messaging_port>4501</nc_messaging_port>
+ </node>
+ <node>
+ <id>nc2</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <nc_messaging_port>4502</nc_messaging_port>
+ </node>
+</cluster>
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 473a163..289a8ed 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -69,4 +69,8 @@
public Map<String, String> getTransactionLogDirs() {
return accessor.getTransactionLogDirs();
}
+
+ public int getNodeMessagingPort(String nodeId) {
+ return accessor.getNodeMessagingPort(nodeId);
+ }
}
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 13ce403..3c31936 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -57,7 +57,8 @@
private final Map<String, String> transactionLogDirs;
private final Map<String, String> asterixBuildProperties;
private final Map<String, ClusterPartition[]> nodePartitionsMap;
- private SortedMap<Integer, ClusterPartition> clusterPartitions;
+ private final SortedMap<Integer, ClusterPartition> clusterPartitions;
+ private final Map<String, Integer> node2NCMessagingPort;
public AsterixPropertiesAccessor() throws AsterixException {
String fileName =
System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
@@ -86,7 +87,9 @@
instanceName = asterixConfiguration.getInstanceName();
metadataNodeName = asterixConfiguration.getMetadataNode();
stores = new HashMap<String, String[]>();
+ node2NCMessagingPort = new HashMap<>();
List<Store> configuredStores = asterixConfiguration.getStore();
+
nodeNames = new HashSet<String>();
nodePartitionsMap = new HashMap<>();
clusterPartitions = new TreeMap<>();
@@ -103,6 +106,7 @@
stores.put(store.getNcId(), nodeStores);
nodePartitionsMap.put(store.getNcId(), nodePartitions);
nodeNames.add(store.getNcId());
+ node2NCMessagingPort.put(store.getNcId(),
Integer.parseInt(store.getNcMessagingPort()));
}
asterixConfigurationParams = new HashMap<String, Property>();
for (Property p : asterixConfiguration.getProperty()) {
@@ -208,4 +212,8 @@
public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
return clusterPartitions;
}
+
+ public int getNodeMessagingPort(String nodeId) {
+ return node2NCMessagingPort.get(nodeId);
+ }
}
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5d2e263..95fe37b 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -42,7 +42,7 @@
/**
* Sets a unique message id that identifies this message within an NC.
- * This id is set by {@link
INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+ * This id is set by {@link
INCMessageBroker#sendMessageToCC(IApplicationMessage,
IApplicationMessageCallback)}
* when the callback is not null to notify the sender when the response to
that message is received.
*
* @param messageId
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 41f8a0c..724ed04 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,17 @@
* @param callback
* @throws Exception
*/
- public void sendMessage(IApplicationMessage message,
IApplicationMessageCallback callback) throws Exception;
+ public void sendMessageToCC(IApplicationMessage message,
IApplicationMessageCallback callback) throws Exception;
+
+ /**
+ * Sends application message from this NC to another NC.
+ *
+ * @param message
+ * @param callback
+ * @throws Exception
+ */
+ public void sendMessageToNC(String to, IApplicationMessage message,
IApplicationMessageCallback callback)
+ throws Exception;
/**
* Sends the maximum resource id on this NC to the CC.
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd
b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index bb99319..3a66421 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -56,12 +56,16 @@
<xs:element
name="txnLogDirPath"
type="xs:string" />
-
+ <xs:element
+ name="nc_messaging_port"
+ type="xs:string" />
+
<!-- definition of complex elements -->
<xs:element name="store">
<xs:complexType>
<xs:sequence>
<xs:element ref="mg:ncId" />
+ <xs:element ref="mg:nc_messaging_port" />
<xs:element ref="mg:storeDirs" />
</xs:sequence>
</xs:complexType>
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd
b/asterix-common/src/main/resources/schema/cluster.xsd
index 935d33f..1ec9a64 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -57,6 +57,7 @@
<xs:element name="result_time_to_live" type="xs:long" />
<xs:element name="result_sweep_threshold" type="xs:long" />
<xs:element name="cc_root" type="xs:string" />
+ <xs:element name="nc_messaging_port" type="xs:integer" />
<!-- definition of complex elements -->
<xs:element name="working_dir">
@@ -123,6 +124,7 @@
<xs:element ref="cl:txn_log_dir" minOccurs="0"
/>
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:debug_port" minOccurs="0" />
+ <xs:element ref="cl:nc_messaging_port" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
@@ -151,6 +153,7 @@
<xs:element ref="cl:metadata_node" />
<xs:element ref="cl:data_replication"
minOccurs="0" />
<xs:element ref="cl:master_node" />
+ <xs:element ref="cl:nc_messaging_port" minOccurs="0" />
<xs:element ref="cl:node" maxOccurs="unbounded"
/>
<xs:element ref="cl:substitute_nodes" />
<xs:element ref="cl:heartbeat_period" minOccurs="0" />
diff --git
a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 29765fd..c92262c 100644
---
a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++
b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
public class EventDriver {
public static final String CLIENT_NODE_ID = "client_node";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID,
"127.0.0.1", null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID,
"127.0.0.1", null, null, null, null, null, null);
private static String eventsDir;
private static Events events;
diff --git
a/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
b/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
index e0e5bc4..b8c7f7c 100644
---
a/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
+++
b/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
@@ -25,13 +25,12 @@
import java.util.Timer;
import java.util.TimerTask;
-import org.apache.log4j.Logger;
-
import org.apache.asterix.event.driver.EventDriver;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.event.schema.event.Event;
import org.apache.asterix.event.schema.pattern.Pattern;
import org.apache.asterix.event.schema.pattern.Period;
+import org.apache.log4j.Logger;
public class EventTask extends TimerTask {
@@ -68,8 +67,8 @@
this.interval = EventUtil.parseTimeInterval(period.getAbsvalue(),
period.getUnit());
}
if (pattern.getDelay() != null) {
- this.initialDelay = EventUtil.parseTimeInterval(new
ValueType(pattern.getDelay().getValue()), pattern
- .getDelay().getUnit());
+ this.initialDelay = EventUtil.parseTimeInterval(new
ValueType(pattern.getDelay().getValue()),
+ pattern.getDelay().getUnit());
}
if (pattern.getMaxOccurs() != null) {
this.maxOccurs = pattern.getMaxOccurs();
diff --git
a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index b83faa2..c764cd7 100644
---
a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++
b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -186,12 +186,12 @@
}
if (nodeid.equals(cluster.getMasterNode().getId())) {
- String logDir = cluster.getMasterNode().getLogDir() == null ?
cluster.getLogDir() : cluster.getMasterNode()
- .getLogDir();
- String javaHome = cluster.getMasterNode().getJavaHome() == null ?
cluster.getJavaHome() : cluster
- .getMasterNode().getJavaHome();
+ String logDir = cluster.getMasterNode().getLogDir() == null ?
cluster.getLogDir()
+ : cluster.getMasterNode().getLogDir();
+ String javaHome = cluster.getMasterNode().getJavaHome() == null ?
cluster.getJavaHome()
+ : cluster.getMasterNode().getJavaHome();
return new Node(cluster.getMasterNode().getId(),
cluster.getMasterNode().getClusterIp(), javaHome, logDir,
- null, null, cluster.getMasterNode().getDebugPort());
+ null, null, cluster.getMasterNode().getDebugPort(), null);
}
List<Node> nodeList = cluster.getNode();
@@ -231,8 +231,8 @@
pb.start();
}
- public static void executeLocalScript(Node node, String script,
List<String> args) throws IOException,
- InterruptedException {
+ public static void executeLocalScript(Node node, String script,
List<String> args)
+ throws IOException, InterruptedException {
List<String> pargs = new ArrayList<String>();
pargs.add("/bin/bash");
pargs.add(script);
diff --git
a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index ecbafa7..0f4c9f5 100644
---
a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++
b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -278,6 +278,7 @@
configuration.setMetadataNode(asterixInstanceName + "_" +
metadataNodeId);
List<Store> stores = new ArrayList<Store>();
String storeDir = cluster.getStore().trim();
+ String nodeFeedPort;
for (Node node : cluster.getNode()) {
String iodevices = node.getIodevices() == null ?
cluster.getIodevices() : node.getIodevices();
String[] nodeIdDevice = iodevices.split(",");
@@ -287,7 +288,9 @@
}
//remove last comma
nodeStores.deleteCharAt(nodeStores.length() - 1);
- stores.add(new Store(asterixInstanceName + "_" + node.getId(),
nodeStores.toString()));
+ nodeFeedPort = node.getNcMessagingPort() == null ?
String.valueOf(cluster.getNcMessagingPort().intValue())
+ : String.valueOf(node.getNcMessagingPort().intValue());
+ stores.add(new Store(asterixInstanceName + "_" + node.getId(),
nodeFeedPort, nodeStores.toString()));
}
configuration.setStore(stores);
List<Coredump> coredump = new ArrayList<Coredump>();
@@ -354,6 +357,7 @@
private static void zipDir(File sourceDir, final File destFile,
ZipOutputStream zos) throws IOException {
File[] dirList = sourceDir.listFiles(new FileFilter() {
+ @Override
public boolean accept(File f) {
return !f.getName().endsWith(destFile.getName());
}
@@ -391,7 +395,7 @@
byte[] buffer = new byte[2048];
int read;
while (entries.hasMoreElements()) {
- JarEntry entry = (JarEntry) entries.nextElement();
+ JarEntry entry = entries.nextElement();
String name = entry.getName();
if (name.equals(origFile)) {
continue;
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 7801fd7..38b4824 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -287,5 +287,10 @@
<artifactId>rxjava</artifactId>
<version>1.0.15</version>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.33.Final</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
new file mode 100644
index 0000000..7796082
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+
+public class FeedChannelManager extends Thread {
+
+ public FeedChannelManager(AsterixFeedProperties feedProperties, String
nodeId) {
+ }
+
+ @Override
+ public void run() {
+ }
+}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
index 5095e7d..3965c6b 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
@@ -45,6 +45,8 @@
private static final Logger LOGGER =
Logger.getLogger(FeedManager.class.getName());
+ private FeedChannelManager feedChannelManager;
+
private final IFeedSubscriptionManager feedSubscriptionManager;
private final IFeedConnectionManager feedConnectionManager;
@@ -76,9 +78,11 @@
?
AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp() :
"localhost";
this.feedMessageService = new FeedMessageService(feedProperties,
nodeId, ccClusterIp);
this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
+ this.feedChannelManager = new FeedChannelManager(feedProperties,
nodeId);
try {
this.feedMessageService.start();
this.nodeLoadReportService.start();
+ this.feedChannelManager.start();
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Unable to start feed services " +
e.getMessage());
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
new file mode 100644
index 0000000..22c9a94
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class FeedMessageHandler extends ChannelInboundHandlerAdapter {
+
+}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
new file mode 100644
index 0000000..171b925
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class FeedMessageServer {
+ private final int port;
+ private final int numberOfChannels;
+
+ public FeedMessageServer(int port, int numOfClusterNodes) {
+ this.port = port;
+ this.numberOfChannels = numOfClusterNodes;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup,
workerGroup).channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws
Exception {
+ ch.pipeline().addLast(new FeedMessageHandler());
+ }
+ }).option(ChannelOption.SO_BACKLOG,
numberOfChannels).childOption(ChannelOption.SO_KEEPALIVE, true);
+ ChannelFuture f = b.bind(port).sync();
+ f.channel().closeFuture().sync();
+ } finally {
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }
+}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
index 870a6df..bb78233 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -38,10 +38,9 @@
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
public class IndexingScheduler {
- private static final Logger LOGGER =
Logger.getLogger(Scheduler.class.getName());
+ private static final Logger LOGGER =
Logger.getLogger(IndexingScheduler.class.getName());
/** a list of NCs */
private String[] NCs;
diff --git
a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 9559394..4037eaf 100644
---
a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++
b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -105,7 +105,7 @@
MasterNode masterNode = cluster.getMasterNode();
Node master = new Node(masterNode.getId(),
masterNode.getClusterIp(), masterNode.getJavaHome(),
- masterNode.getLogDir(), null, null, null);
+ masterNode.getLogDir(), null, null, null, null);
ipAddresses.add(masterNode.getClusterIp());
valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterix-installer/src/main/resources/clusters/demo/demo.xml
b/asterix-installer/src/main/resources/clusters/demo/demo.xml
index 1932fcc..8d5f13b 100644
--- a/asterix-installer/src/main/resources/clusters/demo/demo.xml
+++ b/asterix-installer/src/main/resources/clusters/demo/demo.xml
@@ -32,6 +32,7 @@
<log_dir>/tmp/asterix/logs</log_dir>
<store>storage</store>
<java_home></java_home>
+ <nc_messaging_port>5403</nc_messaging_port>
<master_node>
<id>master</id>
<client_ip>127.0.0.1</client_ip>
@@ -45,11 +46,13 @@
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/node1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/node1/1,/tmp/asterix/node1/2</iodevices>
+ <nc_messaging_port>5401</nc_messaging_port>
</node>
<node>
<id>node2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/node2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/node2/1,/tmp/asterix/node2/2</iodevices>
+ <nc_messaging_port>5402</nc_messaging_port>
</node>
</cluster>
diff --git a/asterix-installer/src/main/resources/clusters/local/local.xml
b/asterix-installer/src/main/resources/clusters/local/local.xml
index 20f697f..7ed8b74 100644
--- a/asterix-installer/src/main/resources/clusters/local/local.xml
+++ b/asterix-installer/src/main/resources/clusters/local/local.xml
@@ -40,6 +40,7 @@
<result_time_to_live>86400000</result_time_to_live>
<!-- The duration within which an instance of the result cleanup should be
invoked in milliseconds. (default: 1 minute) -->
<result_sweep_threshold>60000</result_sweep_threshold>
+ <nc_messaging_port>5403</nc_messaging_port>
<master_node>
<id>master</id>
<client_ip>127.0.0.1</client_ip>
@@ -53,12 +54,13 @@
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
-
+ <nc_messaging_port>5401</nc_messaging_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
+ <nc_messaging_port>5402</nc_messaging_port>
</node>
</cluster>
diff --git
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index 3e37694..e7bf3bf 100644
---
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -62,9 +62,10 @@
private ClusterManager() {
Cluster asterixCluster =
AsterixClusterProperties.INSTANCE.getCluster();
- String eventHome = asterixCluster == null ? null :
asterixCluster.getWorkingDir().getDir();
+ String eventHome = asterixCluster == null ? null
+ : asterixCluster.getWorkingDir() == null ? null :
asterixCluster.getWorkingDir().getDir();
- if (asterixCluster != null) {
+ if (eventHome != null) {
String asterixDir = System.getProperty("user.dir") +
File.separator + "asterix";
File configFile = new File(System.getProperty("user.dir") +
File.separator + "configuration.xml");
Configuration configuration = null;
@@ -74,8 +75,8 @@
Unmarshaller unmarshaller = configCtx.createUnmarshaller();
configuration = (Configuration)
unmarshaller.unmarshal(configFile);
AsterixEventService.initialize(configuration, asterixDir,
eventHome);
- client =
AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE
- .getCluster());
+ client = AsterixEventService
+
.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
lookupService = ServiceProvider.INSTANCE.getLookupService();
if (!lookupService.isRunning(configuration)) {
diff --git
a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 3173525..faa0236 100644
---
a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++
b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -96,6 +96,7 @@
private Set<String> failedNodes = new HashSet<>();
private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+ private HashMap<String, String> node2Ip = null;
private AsterixClusterProperties() {
InputStream is =
this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -686,4 +687,20 @@
}
return stateDescription;
}
-}
+
+ public String getNodeIpAddress(String nodeId) {
+ if (node2Ip == null) {
+ synchronized (this) {
+ if (node2Ip == null) {
+ node2Ip = new HashMap<>();
+ String instanceName = cluster.getInstanceName();
+ List<Node> nodes = cluster.getNode();
+ for (Node node : nodes) {
+ node2Ip.put(instanceName + "_" + node.getId(),
node.getClusterIp());
+ }
+ }
+ }
+ }
+ return node2Ip.get(nodeId);
+ }
+}
\ No newline at end of file
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
index 5b29530..ab1ebe1 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -57,7 +57,7 @@
//if no response available or it has an exception, request a new
one
if (reponse == null || reponse.getException() != null) {
ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
- ((INCMessageBroker)
appCtx.getMessageBroker()).sendMessage(msg, this);
+ ((INCMessageBroker)
appCtx.getMessageBroker()).sendMessageToCC(msg, this);
reponse = (ResourceIdRequestResponseMessage)
resourceIdResponseQ.take();
if (reponse.getException() != null) {
throw new
HyracksDataException(reponse.getException().getMessage());
diff --git
a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
index f9d10af..c834128 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
@@ -1358,7 +1358,7 @@
}
//remove last comma
nodeStores.deleteCharAt(nodeStores.length() - 1);
- stores.add(new Store(node.getId(), nodeStores.toString()));
+ stores.add(new Store(node.getId(), "4501", nodeStores.toString()));
}
configuration.setStore(stores);
List<Coredump> coredump = new ArrayList<Coredump>();
@@ -1369,10 +1369,13 @@
coredumpDir = node.getLogDir() == null ? cluster.getLogDir() :
node.getLogDir();
coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" +
File.separator));
txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() :
node.getTxnLogDir(); //node or cluster-wide
- txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir
- + (txnLogDir.charAt(txnLogDir.length() - 1) ==
File.separatorChar ? File.separator : "")
- + "txnLogs" //if the string doesn't have a trailing / add
one
- + File.separator));
+ txnLogDirs
+ .add(new TransactionLogDir(node.getId(),
+ txnLogDir
+ + (txnLogDir.charAt(txnLogDir.length() -
1) == File.separatorChar ? File.separator
+ : "")
+ + "txnLogs" //if the string doesn't have a
trailing / add one
+ + File.separator));
}
configuration.setMetadataNode(metadataNodeId);
configuration.setCoredump(coredump);
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>