Murtadha Hubail has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2190
Change subject: [NO ISSUE][API] Add NC Storage API ...................................................................... [NO ISSUE][API] Add NC Storage API - user model changes: no - storage format changes: no - interface changes: yes Add IStorageSubsystem to track storage partitions replicas. Details: - Add NC API port. - Add storage API to NCs. - Add StorageSubsystem to track storage partitions replicas. Change-Id: I120d9892bc9fe5a73395cd5a2ddc30b51b73ced2 --- A asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-app/src/main/resources/cluster.xml M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java M asterixdb/asterix-common/src/main/resources/schema/cluster.xsd M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java M asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java M asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml M asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml M asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml M asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml M asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml M asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml M asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml M asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf 24 files changed, 530 insertions(+), 11 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/90/2190/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java new file mode 100644 index 0000000..d8636c8 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.io.IOException; +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Predicate; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.storage.IStorageSubsystem; +import org.apache.asterix.common.storage.PartitionReplica; +import org.apache.asterix.common.storage.ReplicaIdentifier; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.http.server.utils.HttpUtil; +import org.apache.hyracks.util.JSONUtil; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.netty.handler.codec.http.HttpResponseStatus; + +public class StorageApiServlet extends AbstractServlet { + + private static final Logger LOGGER = Logger.getLogger(StorageApiServlet.class.getName()); + private final INcApplicationContext appCtx; + + public StorageApiServlet(ConcurrentMap<String, Object> ctx, INcApplicationContext appCtx, String... paths) { + super(ctx, paths); + this.appCtx = appCtx; + } + + @Override + protected void get(IServletRequest request, IServletResponse response) throws IOException { + HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); + PrintWriter responseWriter = response.writer(); + try { + JsonNode json; + response.setStatus(HttpResponseStatus.OK); + final String path = localPath(request); + if ("".equals(path)) { + json = getStatus(p -> true); + } else if (path.startsWith("/partition")) { + json = getPartitionStatus(path); + } else { + throw new IllegalArgumentException(); + } + JSONUtil.writeNode(responseWriter, json); + } catch (IllegalArgumentException e) { + LOGGER.log(Level.INFO, "Unrecognized path: " + request, e); + response.setStatus(HttpResponseStatus.NOT_FOUND); + } catch (Exception e) { + LOGGER.log(Level.INFO, "exception thrown for " + request, e); + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + responseWriter.write(e.toString()); + } + responseWriter.flush(); + } + + @Override + protected void post(IServletRequest request, IServletResponse response) throws Exception { + switch (localPath(request)) { + case "/addReplica": + processAddReplica(request, response); + break; + case "/removeReplica": + processRemoveReplica(request, response); + break; + default: + sendError(response, HttpResponseStatus.NOT_FOUND); + break; + } + } + + private JsonNode getPartitionStatus(String path) { + String[] token = path.split("/"); + if (token.length != 3) { + throw new IllegalArgumentException(); + } + // get the partition number from the path + final Integer partition = Integer.valueOf(token[2]); + return getStatus(partition::equals); + } + + private JsonNode getStatus(Predicate<Integer> predicate) { + final ArrayNode status = OBJECT_MAPPER.createArrayNode(); + final IStorageSubsystem storageSubsystem = appCtx.getStorageSubsystem(); + final Set<Integer> partitions = + storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet()); + for (Integer partition : partitions) { + final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode(); + partitionJson.put("partition", partition); + final List<PartitionReplica> replicas = storageSubsystem.getReplicas(partition); + ArrayNode replicasArray = OBJECT_MAPPER.createArrayNode(); + for (PartitionReplica replica : replicas) { + final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode(); + replicaJson.put("location", replica.getIdentifier().getLocation().toString()); + replicaJson.put("status", replica.getStatus().toString()); + replicasArray.add(replicaJson); + } + partitionJson.set("replicas", replicasArray); + status.add(partitionJson); + } + return status; + } + + private void processAddReplica(IServletRequest request, IServletResponse response) { + final ReplicaIdentifier replicaIdentifier = getReplicaIdentifier(request); + if (replicaIdentifier == null) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + appCtx.getStorageSubsystem().addReplica(replicaIdentifier); + response.setStatus(HttpResponseStatus.OK); + } + + private void processRemoveReplica(IServletRequest request, IServletResponse response) { + final ReplicaIdentifier replicaIdentifier = getReplicaIdentifier(request); + if (replicaIdentifier == null) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + appCtx.getStorageSubsystem().removeReplica(replicaIdentifier); + response.setStatus(HttpResponseStatus.OK); + } + + private ReplicaIdentifier getReplicaIdentifier(IServletRequest request) { + final String partition = request.getParameter("partition"); + final String host = request.getParameter("host"); + final String port = request.getParameter("port"); + if (partition == null || host == null || port == null) { + return null; + } + final InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(host, Integer.valueOf(port)); + return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index a3def26..5cae2d6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -22,12 +22,14 @@ import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.asterix.active.ActiveManager; import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery; @@ -58,6 +60,7 @@ import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationChannel; import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.storage.IStorageSubsystem; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; @@ -138,6 +141,7 @@ private final NCExtensionManager ncExtensionManager; private final IStorageComponentProvider componentProvider; private IHyracksClientConnection hcc; + private IStorageSubsystem storageSubsystem; public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions) throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException, @@ -204,15 +208,17 @@ datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(), datasetMemoryManager, ioManager.getIODevices().size()); - + final String nodeId = getServiceContext().getNodeId(); + final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId); + final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId) + .collect(Collectors.toSet()); + storageSubsystem = new StorageSubsystem(nodePartitionsIds); isShuttingdown = false; - activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(), activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(), this.ncServiceContext); if (replicationProperties.isParticipant(getServiceContext().getNodeId())) { - String nodeId = getServiceContext().getNodeId(); replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties); @@ -518,4 +524,9 @@ } return hcc; } + + @Override + public IStorageSubsystem getStorageSubsystem() { + return storageSubsystem; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java new file mode 100644 index 0000000..0d81778 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.asterix.common.storage.IStorageSubsystem; +import org.apache.asterix.common.storage.PartitionReplica; +import org.apache.asterix.common.storage.ReplicaIdentifier; + +public class StorageSubsystem implements IStorageSubsystem { + + /** + * the partitions to which the current node is master + */ + private final Set<Integer> partitions = new HashSet<>(); + /** + * current replicas + */ + private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>(); + + public StorageSubsystem(Set<Integer> partitions) { + this.partitions.addAll(partitions); + } + + @Override + public synchronized void addReplica(ReplicaIdentifier id) { + if (!partitions.contains(id.getPartition())) { + throw new IllegalStateException( + "This node is not the current master of partition(" + id.getPartition() + ")"); + } + replicas.computeIfAbsent(id, key -> new PartitionReplica(key)); + replicas.get(id); + } + + @Override + public synchronized void removeReplica(ReplicaIdentifier id) { + if (!replicas.containsKey(id)) { + throw new IllegalStateException("replica with id(" + id + ") does not exist"); + } + replicas.remove(id); + } + + @Override + public List<PartitionReplica> getReplicas(int partition) { + return replicas.entrySet().stream().filter(e -> e.getKey().getPartition() == partition).map(Map.Entry::getValue) + .collect(Collectors.toList()); + } + + @Override + public Set<Integer> getPartitions() { + return Collections.unmodifiableSet(partitions); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 8b417a9..36ed35d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -25,6 +25,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.api.http.server.ServletConstants; +import org.apache.asterix.api.http.server.StorageApiServlet; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; import org.apache.asterix.common.api.AsterixThreadFactory; @@ -41,6 +43,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.utils.PrintUtil; +import org.apache.asterix.common.utils.Servlets; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.event.schema.cluster.Cluster; @@ -59,6 +62,7 @@ import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.BaseNCApplication; import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.http.server.HttpServer; import org.apache.hyracks.http.server.WebManager; public class NCApplication extends BaseNCApplication { @@ -137,7 +141,11 @@ } protected void configureServers() throws Exception { - // override to start web services on NC nodes + HttpServer apiServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), + getApplicationContext().getExternalProperties().getNcApiPort()); + apiServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ncServiceCtx); + apiServer.addServlet(new StorageApiServlet(apiServer.ctx(), getApplicationContext(), Servlets.STORAGE)); + webManager.add(apiServer); } protected List<AsterixExtension> getExtensions() { @@ -210,8 +218,8 @@ StorageProperties storageProperties = runtimeContext.getStorageProperties(); // Deducts the reserved buffer cache size and memory component size from the maxium heap size, // and deducts one core for processing heartbeats. - long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize() - - storageProperties.getMemoryComponentGlobalBudget(); + long memorySize = Runtime.getRuntime().maxMemory() - storageProperties.getBufferCacheSize() - storageProperties + .getMemoryComponentGlobalBudget(); int allCores = Runtime.getRuntime().availableProcessors(); int maximumCoresForComputation = allCores > 1 ? allCores - 1 : allCores; return new NodeCapacity(memorySize, maximumCoresForComputation); diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml index 7f78b26..7b7d52a 100644 --- a/asterixdb/asterix-app/src/main/resources/cluster.xml +++ b/asterixdb/asterix-app/src/main/resources/cluster.xml @@ -48,10 +48,12 @@ <id>nc1</id> <cluster_ip>127.0.0.1</cluster_ip> <replication_port>2016</replication_port> + <nc_api_port>19004</nc_api_port> </node> <node> <id>nc2</id> <cluster_ip>127.0.0.1</cluster_ip> <replication_port>2017</replication_port> + <nc_api_port>19005</nc_api_port> </node> </cluster> \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index 8afa66d..162e693 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -28,6 +28,7 @@ import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationChannel; import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.storage.IStorageSubsystem; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.INCServiceContext; @@ -115,4 +116,6 @@ @Override INCServiceContext getServiceContext(); + + IStorageSubsystem getStorageSubsystem(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java index bde8303..e718903 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java @@ -18,7 +18,9 @@ */ package org.apache.asterix.common.config; -import static org.apache.hyracks.control.common.config.OptionTypes.*; +import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; +import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL; +import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; @@ -31,6 +33,7 @@ WEB_QUERYINTERFACE_PORT(INTEGER, 19006, "The listen port of the query web interface"), API_PORT(INTEGER, 19002, "The listen port of the API server"), ACTIVE_PORT(INTEGER, 19003, "The listen port of the active server"), + NC_API_PORT(INTEGER, 19004, "The listen port of the node controller API server"), LOG_LEVEL(LEVEL, java.util.logging.Level.WARNING, "The logging level for master and slave processes"), MAX_WAIT_ACTIVE_CLUSTER(INTEGER, 60, "The max pending time (in seconds) for cluster startup. After the " + "threshold, if the cluster still is not up and running, it is considered unavailable"), @@ -55,6 +58,8 @@ case API_PORT: case ACTIVE_PORT: return Section.CC; + case NC_API_PORT: + return Section.NC; case LOG_LEVEL: case MAX_WAIT_ACTIVE_CLUSTER: return Section.COMMON; @@ -117,4 +122,8 @@ public String getCCJavaParams() { return accessor.getString(Option.CC_JAVA_OPTS); } + + public int getNcApiPort() { + return accessor.getInt(Option.NC_API_PORT); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java index eacc18b..028a871 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java @@ -59,6 +59,7 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.utils.ConfigUtil; +import org.apache.asterix.event.schema.cluster.Node; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.config.IApplicationConfig; @@ -142,6 +143,11 @@ // marking node as virtual, as we're not using NCServices with old-style config configManager.set(store.getNcId(), NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED); } + // populate nc api port from cluster properties + final ExternalProperties.Option آncApiPort = ExternalProperties.Option.NC_API_PORT; + for (Node node : ClusterProperties.INSTANCE.getCluster().getNode()) { + configManager.set(node.getId(), آncApiPort, node.getNcApiPort().intValue()); + } // Get extensions if (asterixConfiguration.getExtensions() != null) { for (Extension ext : asterixConfiguration.getExtensions().getExtension()) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java new file mode 100644 index 0000000..b4f06cb --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.storage; + +import java.util.List; +import java.util.Set; + +public interface IStorageSubsystem { + + /** + * Adds a replica with the specified {@code id} + * + * @param id + */ + void addReplica(ReplicaIdentifier id); + + /** + * Removes the replica with the specified {@code id} + * + * @param id + */ + void removeReplica(ReplicaIdentifier id); + + /** + * The existing replicas of the partition {@code partition} + * + * @param partition + * @return The list of replicas + */ + List<PartitionReplica> getReplicas(int partition); + + /** + * Gets the list of partition to which the current node is + * the master of. + * + * @return The list of partition + */ + Set<Integer> getPartitions(); +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java new file mode 100644 index 0000000..18733ce --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.storage; + +import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.CATCHING_UP; +import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.DISCONNECTED; +import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.IN_SYNC; + +import org.apache.hyracks.util.JSONUtil; +import org.apache.hyracks.util.annotations.ThreadSafe; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@ThreadSafe +public class PartitionReplica { + + public enum PartitionReplicaStatus { + /* replica is in-sync with master */ + IN_SYNC, + /* replica is still catching up with master */ + CATCHING_UP, + /* replica is not connected with master */ + DISCONNECTED + } + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final ReplicaIdentifier id; + private PartitionReplicaStatus status = DISCONNECTED; + + public PartitionReplica(ReplicaIdentifier id) { + this.id = id; + } + + public synchronized PartitionReplicaStatus getStatus() { + return status; + } + + public ReplicaIdentifier getIdentifier() { + return id; + } + + public synchronized void sync() { + if (status == IN_SYNC || status == CATCHING_UP) { + return; + } + //TODO complete implementation + } + + public JsonNode asJson() { + ObjectNode json = OBJECT_MAPPER.createObjectNode(); + json.put("id", id.toString()); + json.put("state", status.name()); + return json; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionReplica that = (PartitionReplica) o; + return id.equals(that.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + try { + return JSONUtil.convertNode(asJson()); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e); + } + } +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java new file mode 100644 index 0000000..2072335 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.storage; + +import java.net.InetSocketAddress; + +public class ReplicaIdentifier { + + private final int partition; + private final InetSocketAddress location; + private final String id; + + private ReplicaIdentifier(int partition, InetSocketAddress location) { + this.partition = partition; + this.location = location; + id = partition + "@" + location.toString(); + } + + public static ReplicaIdentifier of(int partition, InetSocketAddress location) { + return new ReplicaIdentifier(partition, location); + } + + public int getPartition() { + return partition; + } + + public InetSocketAddress getLocation() { + return location; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) { + return false; + } + ReplicaIdentifier that = (ReplicaIdentifier) o; + return id.equals(that.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return id; + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java index 0f7ab4d..d5f23bf 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java @@ -42,6 +42,7 @@ public static final String CLUSTER_STATE_CC_DETAIL = "/admin/cluster/cc/*"; public static final String DIAGNOSTICS = "/admin/diagnostics"; public static final String ACTIVE_STATS = "/admin/active/*"; + public static final String STORAGE = "/admin/storage/*"; private Servlets() { } diff --git a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd index 8cd7b42..be189e3 100644 --- a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd +++ b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd @@ -58,6 +58,7 @@ <xs:element name="cc_root" type="xs:string" /> <xs:element name="strategy" type="xs:string" /> <xs:element name="node_id" type="xs:string" /> + <xs:element name="nc_api_port" type="xs:integer" /> <!-- definition of complex elements --> <xs:element name="working_dir"> @@ -143,6 +144,7 @@ <xs:element ref="cl:iodevices" minOccurs="0" /> <xs:element ref="cl:debug_port" minOccurs="0" /> <xs:element ref="cl:replication_port" minOccurs="0" /> + <xs:element ref="cl:nc_api_port" minOccurs="0" /> </xs:sequence> </xs:complexType> </xs:element> diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java index 415433e..ddcedc4 100644 --- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java +++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java @@ -42,7 +42,7 @@ public static final String CLIENT_NODE_ID = "client_node"; public static final String CLUSTER_IP = "127.0.0.1"; - public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, CLUSTER_IP, null, null, null, null, null, null); + public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, CLUSTER_IP, null, null, null, null, null, null, null); private static String eventsDir; private static Map<String, String> env = new HashMap<String, String>(); diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java index 4dc3124..9d684ec 100644 --- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java +++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java @@ -186,7 +186,7 @@ String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster .getMasterNode().getJavaHome(); return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir, - null, null, cluster.getMasterNode().getDebugPort(), null); + null, null, cluster.getMasterNode().getDebugPort(), null, null); } List<Node> nodeList = cluster.getNode(); diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java index 748d811..3e079bd 100644 --- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java +++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java @@ -105,7 +105,7 @@ MasterNode masterNode = cluster.getMasterNode(); Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(), - masterNode.getLogDir(), null, null, null, null); + masterNode.getLogDir(), null, null, null, null, null); ipAddresses.add(masterNode.getClusterIp()); valid = valid & validateNodeConfiguration(master, cluster); diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml b/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml index 2721eec..500172c 100644 --- a/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml +++ b/asterixdb/asterix-installer/src/main/resources/clusters/demo/demo.xml @@ -45,11 +45,13 @@ <cluster_ip>127.0.0.1</cluster_ip> <txn_log_dir>/tmp/asterix/node1/txnLogs</txn_log_dir> <iodevices>/tmp/asterix/node1/1,/tmp/asterix/node1/2</iodevices> + <nc_api_port>19004</nc_api_port> </node> <node> <id>node2</id> <cluster_ip>127.0.0.1</cluster_ip> <txn_log_dir>/tmp/asterix/node2/txnLogs</txn_log_dir> <iodevices>/tmp/asterix/node2/1,/tmp/asterix/node2/2</iodevices> + <nc_api_port>19005</nc_api_port> </node> </cluster> diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml index f2afe5e..b26d836 100644 --- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml +++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local.xml @@ -51,12 +51,13 @@ <cluster_ip>127.0.0.1</cluster_ip> <txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir> <iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices> - + <nc_api_port>19004</nc_api_port> </node> <node> <id>nc2</id> <cluster_ip>127.0.0.1</cluster_ip> <txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir> <iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices> + <nc_api_port>19005</nc_api_port> </node> </cluster> diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml index 57d04c7..c445835 100644 --- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml +++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml @@ -68,6 +68,7 @@ <txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir> <iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices> <replication_port>2000</replication_port> + <nc_api_port>19004</nc_api_port> </node> <node> <id>nc2</id> @@ -75,5 +76,6 @@ <txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir> <iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices> <replication_port>2001</replication_port> + <nc_api_port>19005</nc_api_port> </node> </cluster> \ No newline at end of file diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml index 7a435b7..fbe0de8 100644 --- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml +++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml @@ -72,6 +72,7 @@ <txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir> <iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices> <replication_port>2000</replication_port> + <nc_api_port>19004</nc_api_port> </node> <node> <id>nc2</id> @@ -79,5 +80,6 @@ <txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir> <iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices> <replication_port>2001</replication_port> + <nc_api_port>19005</nc_api_port> </node> </cluster> \ No newline at end of file diff --git a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml index 9eb728f..5ad3921 100644 --- a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml +++ b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster.xml @@ -46,9 +46,11 @@ <node> <id>nc1</id> <cluster_ip>10.10.0.3</cluster_ip> + <nc_api_port>19004</nc_api_port> </node> <node> <id>nc2</id> <cluster_ip>10.10.0.4</cluster_ip> + <nc_api_port>19005</nc_api_port> </node> </cluster> diff --git a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml index bb66131..003b3c8 100644 --- a/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml +++ b/asterixdb/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml @@ -55,9 +55,11 @@ <node> <id>nc1</id> <cluster_ip>10.10.0.3</cluster_ip> + <nc_api_port>19004</nc_api_port> </node> <node> <id>nc2</id> <cluster_ip>10.10.0.4</cluster_ip> + <nc_api_port>19005</nc_api_port> </node> </cluster> \ No newline at end of file diff --git a/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml b/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml index 4d5d0bd..ed82a55 100644 --- a/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml +++ b/asterixdb/asterix-installer/src/test/resources/docker/cluster-config.xml @@ -60,17 +60,21 @@ <node> <id>nc1</id> <cluster_ip>172.20.0.3</cluster_ip> + <nc_api_port>19004</nc_api_port> </node> <node> <id>nc2</id> <cluster_ip>172.20.0.4</cluster_ip> + <nc_api_port>19005</nc_api_port> </node> <node> <id>nc3</id> <cluster_ip>172.20.0.5</cluster_ip> + <nc_api_port>19006</nc_api_port> </node> <node> <id>nc4</id> <cluster_ip>172.20.0.6</cluster_ip> + <nc_api_port>19007</nc_api_port> </node> </cluster> diff --git a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf index 2a1c652..a6cb064 100644 --- a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf +++ b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf @@ -19,6 +19,7 @@ txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2 +nc.api.port=19004 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006 [nc/asterix_nc2] @@ -26,6 +27,7 @@ txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump iodevices=../asterix-server/target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2 +nc.api.port=19005 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007 [nc] -- To view, visit https://asterix-gerrit.ics.uci.edu/2190 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I120d9892bc9fe5a73395cd5a2ddc30b51b73ced2 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>