Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2195
Change subject: [NO ISSUE][TEST] Add NC Storage API Test
......................................................................
[NO ISSUE][TEST] Add NC Storage API Test
- user model changes: no
- storage format changes: no
- interface changes: yes
Add IPartitionReplica to use it at the
APIs level.
Details:
- Add option to TestExecutor to target specific
NC end point.
- Add storage API test case.
Change-Id: I76c336a66e32036a34d30ce6d3a31195c342c4a9
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/main/resources/cluster.xml
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
A
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
A asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
A
asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
M asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
M
asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M
asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M
asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml
M
asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
M
asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
M asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml
M
asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
M asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml
A
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
M asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
M asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
34 files changed, 836 insertions(+), 15 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/95/2195/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2a8a831..98dabc9 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -321,6 +321,7 @@
@Override
public void stop() throws Exception {
// ungraceful shutdown
+ webManager.stop();
}
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
new file mode 100644
index 0000000..1f9b144
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class StorageApiServlet extends AbstractServlet {
+
+ private static final Logger LOGGER =
Logger.getLogger(StorageApiServlet.class.getName());
+ private final INcApplicationContext appCtx;
+
+ public StorageApiServlet(ConcurrentMap<String, Object> ctx,
INcApplicationContext appCtx, String... paths) {
+ super(ctx, paths);
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response)
throws IOException {
+ HttpUtil.setContentType(response,
HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
+ PrintWriter responseWriter = response.writer();
+ try {
+ JsonNode json;
+ response.setStatus(HttpResponseStatus.OK);
+ final String path = localPath(request);
+ if ("".equals(path)) {
+ json = getStatus(p -> true);
+ } else if (path.startsWith("/partition")) {
+ json = getPartitionStatus(path);
+ } else {
+ throw new IllegalArgumentException();
+ }
+ JSONUtil.writeNode(responseWriter, json);
+ } catch (IllegalArgumentException e) {
+ LOGGER.log(Level.INFO, "Unrecognized path: " + request, e);
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, "exception thrown for " + request, e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ responseWriter.write(e.toString());
+ }
+ responseWriter.flush();
+ }
+
+ @Override
+ protected void post(IServletRequest request, IServletResponse response)
throws Exception {
+ switch (localPath(request)) {
+ case "/addReplica":
+ processAddReplica(request, response);
+ break;
+ case "/removeReplica":
+ processRemoveReplica(request, response);
+ break;
+ default:
+ sendError(response, HttpResponseStatus.NOT_FOUND);
+ break;
+ }
+ }
+
+ private JsonNode getPartitionStatus(String path) {
+ String[] token = path.split("/");
+ if (token.length != 3) {
+ throw new IllegalArgumentException();
+ }
+ // get the partition number from the path
+ final Integer partition = Integer.valueOf(token[2]);
+ return getStatus(partition::equals);
+ }
+
+ private JsonNode getStatus(Predicate<Integer> predicate) {
+ final ArrayNode status = OBJECT_MAPPER.createArrayNode();
+ final IStorageSubsystem storageSubsystem =
appCtx.getStorageSubsystem();
+ final Set<Integer> partitions =
+
storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet());
+ for (Integer partition : partitions) {
+ final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode();
+ partitionJson.put("partition", partition);
+ final List<IPartitionReplica> replicas =
storageSubsystem.getReplicas(partition);
+ ArrayNode replicasArray = OBJECT_MAPPER.createArrayNode();
+ for (IPartitionReplica replica : replicas) {
+ final ObjectNode replicaJson =
OBJECT_MAPPER.createObjectNode();
+ replicaJson.put("location",
replica.getIdentifier().getLocation().toString());
+ replicaJson.put("status", replica.getStatus().toString());
+ replicasArray.add(replicaJson);
+ }
+ partitionJson.set("replicas", replicasArray);
+ status.add(partitionJson);
+ }
+ return status;
+ }
+
+ private void processAddReplica(IServletRequest request, IServletResponse
response) {
+ final ReplicaIdentifier replicaIdentifier =
getReplicaIdentifier(request);
+ if (replicaIdentifier == null) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ appCtx.getStorageSubsystem().addReplica(replicaIdentifier);
+ response.setStatus(HttpResponseStatus.OK);
+ }
+
+ private void processRemoveReplica(IServletRequest request,
IServletResponse response) {
+ final ReplicaIdentifier replicaIdentifier =
getReplicaIdentifier(request);
+ if (replicaIdentifier == null) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ appCtx.getStorageSubsystem().removeReplica(replicaIdentifier);
+ response.setStatus(HttpResponseStatus.OK);
+ }
+
+ private ReplicaIdentifier getReplicaIdentifier(IServletRequest request) {
+ final String partition = request.getParameter("partition");
+ final String host = request.getParameter("host");
+ final String port = request.getParameter("port");
+ if (partition == null || host == null || port == null) {
+ return null;
+ }
+ final InetSocketAddress replicaAddress = new InetSocketAddress(host,
Integer.valueOf(port));
+ return ReplicaIdentifier.of(Integer.valueOf(partition),
replicaAddress);
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index a3def26..5cae2d6 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -22,12 +22,14 @@
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
@@ -58,6 +60,7 @@
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IStorageSubsystem;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -138,6 +141,7 @@
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
private IHyracksClientConnection hcc;
+ private IStorageSubsystem storageSubsystem;
public NCAppRuntimeContext(INCServiceContext ncServiceContext,
List<AsterixExtension> extensions)
throws AsterixException, InstantiationException,
IllegalAccessException, ClassNotFoundException,
@@ -204,15 +208,17 @@
datasetLifecycleManager =
new DatasetLifecycleManager(storageProperties,
localResourceRepository, txnSubsystem.getLogManager(),
datasetMemoryManager, ioManager.getIODevices().size());
-
+ final String nodeId = getServiceContext().getNodeId();
+ final ClusterPartition[] nodePartitions =
metadataProperties.getNodePartitions().get(nodeId);
+ final Set<Integer> nodePartitionsIds =
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
+ .collect(Collectors.toSet());
+ storageSubsystem = new StorageSubsystem(nodePartitionsIds);
isShuttingdown = false;
-
activeManager = new ActiveManager(threadExecutor,
getServiceContext().getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(),
compilerProperties.getFrameSize(),
this.ncServiceContext);
if
(replicationProperties.isParticipant(getServiceContext().getNodeId())) {
- String nodeId = getServiceContext().getNodeId();
replicaResourcesManager = new
ReplicaResourcesManager(localResourceRepository, metadataProperties);
@@ -518,4 +524,9 @@
}
return hcc;
}
+
+ @Override
+ public IStorageSubsystem getStorageSubsystem() {
+ return storageSubsystem;
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
new file mode 100644
index 0000000..e455d3c8
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+public class StorageSubsystem implements IStorageSubsystem {
+
+ /**
+ * the partitions to which the current node is master
+ */
+ private final Set<Integer> partitions = new HashSet<>();
+ /**
+ * current replicas
+ */
+ private final Map<ReplicaIdentifier, PartitionReplica> replicas = new
HashMap<>();
+
+ public StorageSubsystem(Set<Integer> partitions) {
+ this.partitions.addAll(partitions);
+ }
+
+ @Override
+ public synchronized void addReplica(ReplicaIdentifier id) {
+ if (!partitions.contains(id.getPartition())) {
+ throw new IllegalStateException(
+ "This node is not the current master of partition(" +
id.getPartition() + ")");
+ }
+ replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
+ replicas.get(id).sync();
+ }
+
+ @Override
+ public synchronized void removeReplica(ReplicaIdentifier id) {
+ if (!replicas.containsKey(id)) {
+ throw new IllegalStateException("replica with id(" + id + ") does
not exist");
+ }
+ replicas.remove(id);
+ }
+
+ @Override
+ public List<IPartitionReplica> getReplicas(int partition) {
+ return replicas.entrySet().stream().filter(e ->
e.getKey().getPartition() == partition).map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Set<Integer> getPartitions() {
+ return Collections.unmodifiableSet(partitions);
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 8b417a9..36ed35d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -25,6 +25,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.api.http.server.StorageApiServlet;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import
org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.common.api.AsterixThreadFactory;
@@ -41,6 +43,7 @@
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.utils.PrintUtil;
+import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.event.schema.cluster.Cluster;
@@ -59,6 +62,7 @@
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.BaseNCApplication;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.WebManager;
public class NCApplication extends BaseNCApplication {
@@ -137,7 +141,11 @@
}
protected void configureServers() throws Exception {
- // override to start web services on NC nodes
+ HttpServer apiServer = new HttpServer(webManager.getBosses(),
webManager.getWorkers(),
+
getApplicationContext().getExternalProperties().getNcApiPort());
+ apiServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR,
ncServiceCtx);
+ apiServer.addServlet(new StorageApiServlet(apiServer.ctx(),
getApplicationContext(), Servlets.STORAGE));
+ webManager.add(apiServer);
}
protected List<AsterixExtension> getExtensions() {
@@ -210,8 +218,8 @@
StorageProperties storageProperties =
runtimeContext.getStorageProperties();
// Deducts the reserved buffer cache size and memory component size
from the maxium heap size,
// and deducts one core for processing heartbeats.
- long memorySize = Runtime.getRuntime().maxMemory() -
storageProperties.getBufferCacheSize()
- - storageProperties.getMemoryComponentGlobalBudget();
+ long memorySize = Runtime.getRuntime().maxMemory() -
storageProperties.getBufferCacheSize() - storageProperties
+ .getMemoryComponentGlobalBudget();
int allCores = Runtime.getRuntime().availableProcessors();
int maximumCoresForComputation = allCores > 1 ? allCores - 1 :
allCores;
return new NodeCapacity(memorySize, maximumCoresForComputation);
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml
b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 7f78b26..7b7d52a 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -48,10 +48,12 @@
<id>nc1</id>
<cluster_ip>127.0.0.1</cluster_ip>
<replication_port>2016</replication_port>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<replication_port>2017</replication_port>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 0d66256..2b711ff 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -57,6 +57,7 @@
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
import org.apache.asterix.app.external.IExternalUDFLibrarian;
import org.apache.asterix.common.api.Duration;
@@ -129,6 +130,8 @@
private static Method managixExecuteMethod = null;
private static final HashMap<Integer, ITestServer> runningTestServers =
new HashMap<>();
+ private static Map<String, InetSocketAddress> ncEndPoints;
+ private static Map<String, InetSocketAddress> replicationAddress;
/*
* Instance members
@@ -156,6 +159,14 @@
public void setLibrarian(IExternalUDFLibrarian librarian) {
this.librarian = librarian;
+ }
+
+ public void setNcEndPoints(Map<String, InetSocketAddress> ncEndPoints) {
+ this.ncEndPoints = ncEndPoints;
+ }
+
+ public void setNcReplicationAddress(Map<String, InetSocketAddress>
replicationAddress) {
+ this.replicationAddress = replicationAddress;
}
/**
@@ -1139,7 +1150,10 @@
// we only reach here if the loop is over
testLoops.remove(testFile);
break;
-
+ case "sto":
+ command = stripJavaComments(statement).trim().split(" ");
+ executeStorageCommand(command);
+ break;
default:
throw new IllegalArgumentException("No statements of type " +
ctx.getType());
}
@@ -1507,15 +1521,26 @@
}
protected URI createEndpointURI(String path, String query) throws
URISyntaxException {
- int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
- InetSocketAddress endpoint = endpoints.get(endpointIdx);
+ InetSocketAddress endpoint;
+ if (!path.startsWith("nc:")) {
+ int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+ endpoint = endpoints.get(endpointIdx);
+ } else {
+ final String[] tokens = path.split(" ");
+ if (tokens.length != 2) {
+ throw new IllegalArgumentException("Unrecognized http
pattern");
+ }
+ String nodeId = tokens[0].substring(3);
+ endpoint = getNcEndPoint(nodeId);
+ path = tokens[1];
+ }
URI uri = new URI("http", null, endpoint.getHostString(),
endpoint.getPort(), path, query, null);
LOGGER.fine("Created endpoint URI: " + uri);
return uri;
}
public URI getEndpoint(String servlet) throws URISyntaxException {
- return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""),
null);
+ return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet)),
null);
}
public static String stripJavaComments(String text) {
@@ -1619,6 +1644,41 @@
LOGGER.info("Cluster state now " + desiredState);
}
+ private void executeStorageCommand(String[] command) throws Exception {
+ String srcNode = command[0];
+ String api = command[1];
+ final URI endpoint = getEndpoint(srcNode + " " +
Servlets.getAbsolutePath(Servlets.STORAGE) + api);
+ String partition = command[2];
+ String destNode = command[3];
+ final InetSocketAddress destAddress =
getNcReplicationAddress(destNode);
+ List<Parameter> parameters = new ArrayList<>(3);
+ Stream.of("partition", "host", "port").forEach(arg -> {
+ Parameter p = new Parameter();
+ p.setName(arg);
+ parameters.add(p);
+ });
+ parameters.get(0).setValue(partition);
+ parameters.get(1).setValue(destAddress.getHostName());
+ parameters.get(2).setValue(String.valueOf(destAddress.getPort()));
+ final HttpUriRequest httpUriRequest = constructPostMethod(endpoint,
parameters);
+ final HttpResponse httpResponse = executeHttpRequest(httpUriRequest);
+ Assert.assertEquals(HttpStatus.SC_OK,
httpResponse.getStatusLine().getStatusCode());
+ }
+
+ private InetSocketAddress getNcEndPoint(String nodeId) {
+ if (ncEndPoints == null || !ncEndPoints.containsKey(nodeId)) {
+ throw new IllegalStateException("No end point specified for node:
" + nodeId);
+ }
+ return ncEndPoints.get(nodeId);
+ }
+
+ private InetSocketAddress getNcReplicationAddress(String nodeId) {
+ if (replicationAddress == null ||
!replicationAddress.containsKey(nodeId)) {
+ throw new IllegalStateException("No replication address specified
for node: " + nodeId);
+ }
+ return replicationAddress.get(nodeId);
+ }
+
abstract static class TestLoop extends Exception {
private final String target;
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
new file mode 100644
index 0000000..90605df
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the SQL++ runtime tests with the storage parallelism.
+ */
+@RunWith(Parameterized.class)
+public class ReplicationExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME =
"asterix-build-configuration.xml";
+ private static final TestExecutor testExecutor = new TestExecutor();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ final NodeControllerService[] ncs =
ExecutionTestUtil.integrationUtil.ncs;
+ Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext)
nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ int replicationPort =
appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip,
apiPort));
+ replicationAddress.put(nodeId,
InetSocketAddress.createUnresolved(ip, replicationPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ testExecutor.setNcReplicationAddress(replicationAddress);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "ReplicationExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("replication.xml", "replication.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public ReplicationExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
new file mode 100644
index 0000000..e7c015a
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
new file mode 100644
index 0000000..d287fad
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /admin/storage/partition/0
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
new file mode 100644
index 0000000..a635676
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
@@ -0,0 +1,28 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+ QueryFileExtension=".sqlpp">
+ <test-group name="replication">
+ <test-case FilePath="replication">
+ <compilation-unit name="add_replica">
+ <output-dir compare="Text">add_replica</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
new file mode 100644
index 0000000..3553d9c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
@@ -0,0 +1,7 @@
+[ {
+ "partition" : 0,
+ "replicas" : [ {
+ "location" : "127.0.0.1:2017",
+ "status" : "DISCONNECTED"
+ } ]
+} ]
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8afa66d..162e693 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IStorageSubsystem;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -115,4 +116,6 @@
@Override
INCServiceContext getServiceContext();
+
+ IStorageSubsystem getStorageSubsystem();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index bde8303..e718903 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -18,7 +18,9 @@
*/
package org.apache.asterix.common.config;
-import static org.apache.hyracks.control.common.config.OptionTypes.*;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
@@ -31,6 +33,7 @@
WEB_QUERYINTERFACE_PORT(INTEGER, 19006, "The listen port of the query
web interface"),
API_PORT(INTEGER, 19002, "The listen port of the API server"),
ACTIVE_PORT(INTEGER, 19003, "The listen port of the active server"),
+ NC_API_PORT(INTEGER, 19004, "The listen port of the node controller
API server"),
LOG_LEVEL(LEVEL, java.util.logging.Level.WARNING, "The logging level
for master and slave processes"),
MAX_WAIT_ACTIVE_CLUSTER(INTEGER, 60, "The max pending time (in
seconds) for cluster startup. After the " +
"threshold, if the cluster still is not up and running, it is
considered unavailable"),
@@ -55,6 +58,8 @@
case API_PORT:
case ACTIVE_PORT:
return Section.CC;
+ case NC_API_PORT:
+ return Section.NC;
case LOG_LEVEL:
case MAX_WAIT_ACTIVE_CLUSTER:
return Section.COMMON;
@@ -117,4 +122,8 @@
public String getCCJavaParams() {
return accessor.getString(Option.CC_JAVA_OPTS);
}
+
+ public int getNcApiPort() {
+ return accessor.getInt(Option.NC_API_PORT);
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index eacc18b..028a871 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -59,6 +59,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.utils.ConfigUtil;
+import org.apache.asterix.event.schema.cluster.Node;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -142,6 +143,11 @@
// marking node as virtual, as we're not using NCServices with
old-style config
configManager.set(store.getNcId(),
NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED);
}
+ // populate nc api port from cluster properties
+ final ExternalProperties.Option آncApiPort =
ExternalProperties.Option.NC_API_PORT;
+ for (Node node :
ClusterProperties.INSTANCE.getCluster().getNode()) {
+ configManager.set(node.getId(), آncApiPort,
node.getNcApiPort().intValue());
+ }
// Get extensions
if (asterixConfiguration.getExtensions() != null) {
for (Extension ext :
asterixConfiguration.getExtensions().getExtension()) {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
new file mode 100644
index 0000000..034d668
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.exceptions;
+
+public class ReplicationException extends RuntimeException {
+
+ public ReplicationException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
new file mode 100644
index 0000000..5a9dc3f
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+
+public interface IPartitionReplica {
+
+ enum PartitionReplicaStatus {
+ /* replica is in-sync with master */
+ IN_SYNC,
+ /* replica is still catching up with master */
+ CATCHING_UP,
+ /* replica is not connected with master */
+ DISCONNECTED
+ }
+
+ /**
+ * Gets the status of a replica.
+ *
+ * @return The status
+ */
+ PartitionReplicaStatus getStatus();
+
+ /**
+ * Gets the identifier of a replica
+ *
+ * @return The identifier
+ */
+ ReplicaIdentifier getIdentifier();
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
new file mode 100644
index 0000000..7a85696
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.replication.IPartitionReplica;
+
+public interface IStorageSubsystem {
+
+ /**
+ * Adds a replica with the specified {@code id}
+ *
+ * @param id
+ */
+ void addReplica(ReplicaIdentifier id);
+
+ /**
+ * Removes the replica with the specified {@code id}
+ *
+ * @param id
+ */
+ void removeReplica(ReplicaIdentifier id);
+
+ /**
+ * The existing replicas of the partition {@code partition}
+ *
+ * @param partition
+ * @return The list of replicas
+ */
+ List<IPartitionReplica> getReplicas(int partition);
+
+ /**
+ * Gets the list of partition to which the current node is
+ * the master of.
+ *
+ * @return The list of partition
+ */
+ Set<Integer> getPartitions();
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
new file mode 100644
index 0000000..01ffba6
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.net.InetSocketAddress;
+
+public class ReplicaIdentifier {
+
+ private final int partition;
+ private final InetSocketAddress location;
+ private final String id;
+
+ private ReplicaIdentifier(int partition, InetSocketAddress location) {
+ this.partition = partition;
+ this.location = location;
+ id = partition + "@" + location.toString();
+ }
+
+ public static ReplicaIdentifier of(int partition, InetSocketAddress
location) {
+ return new ReplicaIdentifier(partition, location);
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public InetSocketAddress getLocation() {
+ return location;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReplicaIdentifier that = (ReplicaIdentifier) o;
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index 0f7ab4d..1ac3ffa 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -42,7 +42,12 @@
public static final String CLUSTER_STATE_CC_DETAIL = "/admin/cluster/cc/*";
public static final String DIAGNOSTICS = "/admin/diagnostics";
public static final String ACTIVE_STATS = "/admin/active/*";
+ public static final String STORAGE = "/admin/storage/*";
private Servlets() {
}
+
+ public static String getAbsolutePath(String servlet) {
+ return servlet.replaceAll("/\\*$", "");
+ }
}
diff --git a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
index 8cd7b42..be189e3 100644
--- a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
@@ -58,6 +58,7 @@
<xs:element name="cc_root" type="xs:string" />
<xs:element name="strategy" type="xs:string" />
<xs:element name="node_id" type="xs:string" />
+ <xs:element name="nc_api_port" type="xs:integer" />
<!-- definition of complex elements -->
<xs:element name="working_dir">
@@ -143,6 +144,7 @@
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:debug_port" minOccurs="0" />
<xs:element ref="cl:replication_port" minOccurs="0" />
+ <xs:element ref="cl:nc_api_port" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git
a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 415433e..c7f16fc 100644
---
a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++
b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -42,7 +42,8 @@
public static final String CLIENT_NODE_ID = "client_node";
public static final String CLUSTER_IP = "127.0.0.1";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID,
CLUSTER_IP, null, null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID,
CLUSTER_IP, null, null, null, null, null, null,
+ null);
private static String eventsDir;
private static Map<String, String> env = new HashMap<String, String>();
diff --git
a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index 4dc3124..9d684ec 100644
---
a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++
b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -186,7 +186,7 @@
String javaHome = cluster.getMasterNode().getJavaHome() == null ?
cluster.getJavaHome() : cluster
.getMasterNode().getJavaHome();
return new Node(cluster.getMasterNode().getId(),
cluster.getMasterNode().getClusterIp(), javaHome, logDir,
- null, null, cluster.getMasterNode().getDebugPort(), null);
+ null, null, cluster.getMasterNode().getDebugPort(), null,
null);
}
List<Node> nodeList = cluster.getNode();
diff --git
a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 748d811..3e079bd 100644
---
a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++
b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -105,7 +105,7 @@
MasterNode masterNode = cluster.getMasterNode();
Node master = new Node(masterNode.getId(),
masterNode.getClusterIp(), masterNode.getJavaHome(),
- masterNode.getLogDir(), null, null, null, null);
+ masterNode.getLogDir(), null, null, null, null, null);
ipAddresses.add(masterNode.getClusterIp());
valid = valid & validateNodeConfiguration(master, cluster);
diff --git
a/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml
b/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml
index 2721eec..500172c 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml
@@ -45,11 +45,13 @@
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/node1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/node1/1,/tmp/asterix/node1/2</iodevices>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>node2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/node2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/node2/1,/tmp/asterix/node2/2</iodevices>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
diff --git
a/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml
b/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml
index f2afe5e..b26d836 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml
@@ -51,12 +51,13 @@
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
-
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
diff --git
a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
index 57d04c7..c445835 100644
---
a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
+++
b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
@@ -68,6 +68,7 @@
<txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
<replication_port>2000</replication_port>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
@@ -75,5 +76,6 @@
<txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
<replication_port>2001</replication_port>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
\ No newline at end of file
diff --git
a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
index 7a435b7..fbe0de8 100644
---
a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
+++
b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
@@ -72,6 +72,7 @@
<txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
<replication_port>2000</replication_port>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
@@ -79,5 +80,6 @@
<txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
<replication_port>2001</replication_port>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
\ No newline at end of file
diff --git
a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml
b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml
index 9eb728f..5ad3921 100644
--- a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml
+++ b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml
@@ -46,9 +46,11 @@
<node>
<id>nc1</id>
<cluster_ip>10.10.0.3</cluster_ip>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>10.10.0.4</cluster_ip>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
diff --git
a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
index bb66131..003b3c8 100644
---
a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
+++
b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
@@ -55,9 +55,11 @@
<node>
<id>nc1</id>
<cluster_ip>10.10.0.3</cluster_ip>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>10.10.0.4</cluster_ip>
+ <nc_api_port>19005</nc_api_port>
</node>
</cluster>
\ No newline at end of file
diff --git
a/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml
b/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml
index 4d5d0bd..ed82a55 100644
--- a/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml
+++ b/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml
@@ -60,17 +60,21 @@
<node>
<id>nc1</id>
<cluster_ip>172.20.0.3</cluster_ip>
+ <nc_api_port>19004</nc_api_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>172.20.0.4</cluster_ip>
+ <nc_api_port>19005</nc_api_port>
</node>
<node>
<id>nc3</id>
<cluster_ip>172.20.0.5</cluster_ip>
+ <nc_api_port>19006</nc_api_port>
</node>
<node>
<id>nc4</id>
<cluster_ip>172.20.0.6</cluster_ip>
+ <nc_api_port>19007</nc_api_port>
</node>
</cluster>
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
new file mode 100644
index 0000000..9c1646a
--- /dev/null
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.storage;
+
+import static
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP;
+import static
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
+import static
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@ThreadSafe
+public class PartitionReplica implements IPartitionReplica {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final ReplicaIdentifier id;
+ private SocketChannel sc;
+ private PartitionReplicaStatus status = DISCONNECTED;
+
+ public PartitionReplica(ReplicaIdentifier id) {
+ this.id = id;
+ }
+
+ @Override
+ public synchronized PartitionReplicaStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public ReplicaIdentifier getIdentifier() {
+ return id;
+ }
+
+ public synchronized void sync() {
+ if (status == IN_SYNC || status == CATCHING_UP) {
+ return;
+ }
+ }
+
+ public JsonNode asJson() {
+ ObjectNode json = OBJECT_MAPPER.createObjectNode();
+ json.put("id", id.toString());
+ json.put("state", status.name());
+ return json;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionReplica that = (PartitionReplica) o;
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONUtil.convertNode(asJson());
+ } catch (JsonProcessingException e) {
+ throw new ReplicationException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
index 184728d..bec2122 100644
--- a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
+++ b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
@@ -19,12 +19,14 @@
txn.log.dir=data/red/txnlog
core.dump.dir=data/red/coredump
iodevices=data/red
+nc.api.port=19004
[nc/blue]
ncservice.port=9091
txn.log.dir=data/blue/txnlog
core.dump.dir=data/blue/coredump
iodevices=data/blue
+nc.api.port=19005
${NC_BLUE_EXTRA}
[nc]
diff --git
a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
index 2a1c652..a6cb064 100644
--- a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
+++ b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
@@ -19,6 +19,7 @@
txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog
core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump
iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
[nc/asterix_nc2]
@@ -26,6 +27,7 @@
txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog
core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump
iodevices=../asterix-server/target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
[nc]
--
To view, visit https://asterix-gerrit.ics.uci.edu/2195
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I76c336a66e32036a34d30ce6d3a31195c342c4a9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>