Murtadha Hubail has submitted this change and it was merged. Change subject: Allow Replication to be Enabled on Virtual Cluster ......................................................................
Allow Replication to be Enabled on Virtual Cluster - Allow replication port assignment per NC. - Allow replication to be enabled on virtual cluster. - Wait for JOB_ABORT ACK from remote replicas. - Fix LSM component mask file name. - Fix index directory deletion on index drop. - Eliminate multiple partition takeover requests. - Free LogFlusher thread from sending replication ACKs. - Fix possible deadlock between LogFlusher and Logs Replication Thread. - Remove wait for FLUSH_LOG for replicated LSM components: This wait is not needed since on node failure, complete remote recovery is done. Change-Id: I34a38f59c4915a19242adb6a4eaa6ee1c82d2372 Reviewed-on: https://asterix-gerrit.ics.uci.edu/743 Tested-by: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java M asterix-app/src/main/resources/asterix-build-configuration.xml A asterix-app/src/main/resources/cluster.xml M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java M asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java M asterix-common/src/main/resources/schema/cluster.xsd M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java M asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java M asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java M asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java A asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java M asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java M asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java M asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java M asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java 21 files changed, 232 insertions(+), 132 deletions(-) Approvals: Ian Maxon: Looks good to me, approved Jenkins: Verified diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index a568464..c67eb70 100644 --- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -19,6 +19,7 @@ package org.apache.asterix.api.common; import java.io.File; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -83,6 +84,7 @@ ncConfig1.nodeId = ncName; ncConfig1.resultTTL = 30000; ncConfig1.resultSweepThreshold = 1000; + ncConfig1.appArgs = Arrays.asList("-virtual-NC"); String tempPath = System.getProperty(IO_DIR_KEY); if (tempPath.endsWith(File.separator)) { tempPath = tempPath.substring(0, tempPath.length() - 1); @@ -109,8 +111,19 @@ } } ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName(); - ncs[n] = new NodeControllerService(ncConfig1); - ncs[n].start(); + NodeControllerService nodeControllerService = new NodeControllerService(ncConfig1); + ncs[n] = nodeControllerService; + Thread ncStartThread = new Thread() { + @Override + public void run() { + try { + nodeControllerService.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + ncStartThread.start(); ++n; } hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort); diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java index 4922ae6..643bb16 100644 --- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java +++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java @@ -64,6 +64,9 @@ @Option(name = "-initial-run", usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false) public boolean initialRun = false; + @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is running on virtual cluster (default: false)", required = false) + public boolean virtualNC = false; + private INCApplicationContext ncApplicationContext = null; private IAsterixAppRuntimeContext runtimeContext; private String nodeId; @@ -88,7 +91,6 @@ ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager())); ncApplicationContext = ncAppCtx; - nodeId = ncApplicationContext.getNodeId(); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Starting Asterix node controller: " + nodeId); @@ -120,7 +122,8 @@ LOGGER.info("System is in a state: " + systemState); } - if (replicationEnabled) { + //do not attempt to perform remote recovery if this is a virtual NC + if (replicationEnabled && !virtualNC) { if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) { //Try to perform remote recovery IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager(); diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml index 731113b..ff03ab6 100644 --- a/asterix-app/src/main/resources/asterix-build-configuration.xml +++ b/asterix-app/src/main/resources/asterix-build-configuration.xml @@ -98,4 +98,10 @@ <description>Enabling plot of Algebricks plan to tmp folder. (Default = false) </description> </property> + <property> + <name>log.level</name> + <value>WARNING</value> + <description>The minimum log level to be displayed. (Default = INFO) + </description> + </property> </asterixConfiguration> diff --git a/asterix-app/src/main/resources/cluster.xml b/asterix-app/src/main/resources/cluster.xml new file mode 100644 index 0000000..8f0b694 --- /dev/null +++ b/asterix-app/src/main/resources/cluster.xml @@ -0,0 +1,49 @@ +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<cluster xmlns="cluster"> + <instance_name>asterix</instance_name> + <store>storage</store> + + <data_replication> + <enabled>false</enabled> + <replication_port>2016</replication_port> + <replication_factor>2</replication_factor> + <auto_failover>false</auto_failover> + <replication_time_out>30</replication_time_out> + </data_replication> + + <master_node> + <id>master</id> + <client_ip>127.0.0.1</client_ip> + <cluster_ip>127.0.0.1</cluster_ip> + <client_port>1098</client_port> + <cluster_port>1099</cluster_port> + <http_port>8888</http_port> + </master_node> + <node> + <id>nc1</id> + <cluster_ip>127.0.0.1</cluster_ip> + <replication_port>2016</replication_port> + </node> + <node> + <id>nc2</id> + <cluster_ip>127.0.0.1</cluster_ip> + <replication_port>2017</replication_port> + </node> +</cluster> \ No newline at end of file diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java index cb7bcab..5d31d9a 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java @@ -77,10 +77,15 @@ } public int getDataReplicationPort(String nodeId) { - if (cluster != null) { - return cluster.getDataReplication().getReplicationPort().intValue(); + if (cluster != null && cluster.getDataReplication() != null) { + for (int i = 0; i < cluster.getNode().size(); i++) { + Node node = cluster.getNode().get(i); + if (getRealCluserNodeID(node.getId()).equals(nodeId)) { + return node.getReplicationPort() != null ? node.getReplicationPort().intValue() + : cluster.getDataReplication().getReplicationPort().intValue(); + } + } } - return REPLICATION_DATAPORT_DEFAULT; } diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java index 9226a66..a88b82a 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.common.replication; +import java.nio.channels.SocketChannel; + import org.apache.asterix.common.transactions.LogRecord; public interface IReplicationThread extends Runnable { @@ -29,4 +31,9 @@ * The log that has been flushed. */ public void notifyLogReplicationRequester(LogRecord logRecord); + + /** + * @return The replication client socket channel. + */ + public SocketChannel getReplicationClientSocket(); } diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd index 935d33f..ae09b16 100644 --- a/asterix-common/src/main/resources/schema/cluster.xsd +++ b/asterix-common/src/main/resources/schema/cluster.xsd @@ -123,6 +123,7 @@ <xs:element ref="cl:txn_log_dir" minOccurs="0" /> <xs:element ref="cl:iodevices" minOccurs="0" /> <xs:element ref="cl:debug_port" minOccurs="0" /> + <xs:element ref="cl:replication_port" minOccurs="0" /> </xs:sequence> </xs:complexType> </xs:element> diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java index 29765fd..c92262c 100644 --- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java +++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java @@ -41,7 +41,7 @@ public class EventDriver { public static final String CLIENT_NODE_ID = "client_node"; - public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null); + public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null); private static String eventsDir; private static Events events; diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java index b83faa2..57648b8 100644 --- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java +++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.math.BigInteger; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -191,7 +192,7 @@ String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster .getMasterNode().getJavaHome(); return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir, - null, null, cluster.getMasterNode().getDebugPort()); + null, null, cluster.getMasterNode().getDebugPort(), null); } List<Node> nodeList = cluster.getNode(); diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java index 9559394..4037eaf 100644 --- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java +++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java @@ -105,7 +105,7 @@ MasterNode masterNode = cluster.getMasterNode(); Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(), - masterNode.getLogDir(), null, null, null); + masterNode.getLogDir(), null, null, null, null); ipAddresses.add(masterNode.getClusterIp()); valid = valid & validateNodeConfiguration(master, cluster); diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java index 3e37694..e7bf3bf 100644 --- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java +++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java @@ -62,9 +62,10 @@ private ClusterManager() { Cluster asterixCluster = AsterixClusterProperties.INSTANCE.getCluster(); - String eventHome = asterixCluster == null ? null : asterixCluster.getWorkingDir().getDir(); + String eventHome = asterixCluster == null ? null + : asterixCluster.getWorkingDir() == null ? null : asterixCluster.getWorkingDir().getDir(); - if (asterixCluster != null) { + if (eventHome != null) { String asterixDir = System.getProperty("user.dir") + File.separator + "asterix"; File configFile = new File(System.getProperty("user.dir") + File.separator + "configuration.xml"); Configuration configuration = null; @@ -74,8 +75,8 @@ Unmarshaller unmarshaller = configCtx.createUnmarshaller(); configuration = (Configuration) unmarshaller.unmarshal(configFile); AsterixEventService.initialize(configuration, asterixDir, eventHome); - client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE - .getCluster()); + client = AsterixEventService + .getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster()); lookupService = ServiceProvider.INSTANCE.getLookupService(); if (!lookupService.isRunning(configuration)) { diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java index 3173525..abfcf67 100644 --- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java +++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java @@ -371,6 +371,7 @@ partitionRecoveryPlan.get(replica).add(partition.getPartitionId()); } } + break; } } diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java index d2380c1..1ff6cc4 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java @@ -37,7 +37,7 @@ /** * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes */ - public static final String JOB_COMMIT_ACK = "$"; + public static final String JOB_REPLICATION_ACK = "$"; public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES; public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES; @@ -297,11 +297,11 @@ } public static int getJobIdFromLogAckMessage(String msg) { - return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1))); + return Integer.parseInt(msg.substring((msg.indexOf(JOB_REPLICATION_ACK) + 1))); } public static String getNodeIdFromLogAckMessage(String msg) { - return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK)); + return msg.substring(0, msg.indexOf(JOB_REPLICATION_ACK)); } /** diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java index f61fbc6..3b2aff7 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java @@ -24,17 +24,8 @@ private String remoteNodeID; private long remoteLSN; - private boolean isFlushed = false; private long localLSN; public AtomicInteger numOfFlushedIndexes = new AtomicInteger(); - - public boolean isFlushed() { - return isFlushed; - } - - public void setFlushed(boolean isFlushed) { - this.isFlushed = isFlushed; - } public String getRemoteNodeID() { return remoteNodeID; @@ -66,7 +57,10 @@ sb.append("Remote Node: " + remoteNodeID); sb.append(" Remote LSN: " + remoteLSN); sb.append(" Local LSN: " + localLSN); - sb.append(" isFlushed : " + isFlushed); return sb.toString(); } + + public String getNodeUniqueLSN() { + return TxnLogUtil.getNodeUniqueLSN(remoteNodeID, remoteLSN); + } } diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java new file mode 100644 index 0000000..f51a64d --- /dev/null +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.logging; + +public class TxnLogUtil { + + private TxnLogUtil() { + //prevent util class construction + } + + /** + * @param nodeId + * @param LSN + * @return Concatenation of nodeId and LSN + */ + public static String getNodeUniqueLSN(String nodeId, long LSN) { + return nodeId + LSN; + } +} diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index b9447af..331116b 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -95,10 +95,13 @@ private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider; private final static int INTIAL_BUFFER_SIZE = 4000; //4KB private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ; + private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ; private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap; - private final Map<Long, RemoteLogMapping> localLSN2RemoteLSNMap; + private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping; private final LSMComponentsSyncService lsmComponentLSNMappingService; private final Set<Integer> nodeHostedPartitions; + private final ReplicationNotifier replicationNotifier; + private final Object flushLogslock = new Object(); public ReplicationChannel(String nodeId, AsterixReplicationProperties replicationProperties, ILogManager logManager, IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager, @@ -110,9 +113,11 @@ this.replicationProperties = replicationProperties; this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<LSMComponentLSNSyncTask>(); + pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>(); lsmComponentId2PropertiesMap = new ConcurrentHashMap<String, LSMComponentProperties>(); - localLSN2RemoteLSNMap = new ConcurrentHashMap<Long, RemoteLogMapping>(); + replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>(); lsmComponentLSNMappingService = new LSMComponentsSyncService(); + replicationNotifier = new ReplicationNotifier(); replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory()); Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider .getAppContext()).getMetadataProperties().getNodePartitions(); @@ -140,7 +145,7 @@ dataPort); serverSocketChannel.socket().bind(replicationChannelAddress); lsmComponentLSNMappingService.start(); - + replicationNotifier.start(); LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort); //start accepting replication requests @@ -152,7 +157,7 @@ } } catch (IOException e) { throw new IllegalStateException( - "Could not opened replication channel @ IP Address: " + nodeIP + ":" + dataPort, e); + "Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e); } } @@ -164,13 +169,13 @@ if (remainingFile == 0) { if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null) { //if this LSN wont be used for any other index, remove it - if (localLSN2RemoteLSNMap.containsKey(lsmCompProp.getReplicaLSN())) { - int remainingIndexes = localLSN2RemoteLSNMap.get(lsmCompProp.getReplicaLSN()).numOfFlushedIndexes - .decrementAndGet(); + if (replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) { + int remainingIndexes = replicaUniqueLSN2RemoteMapping + .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet(); if (remainingIndexes == 0) { //Note: there is a chance that this is never deleted because some index in the dataset was not flushed because it is empty. //This could be solved by passing only the number of successfully flushed indexes - localLSN2RemoteLSNMap.remove(lsmCompProp.getReplicaLSN()); + replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN()); } } } @@ -180,22 +185,6 @@ lsmComponentId2PropertiesMap.remove(lsmComponentId); LOGGER.log(Level.INFO, "Completed LSMComponent " + lsmComponentId + " Replication."); } - } - - /** - * @param replicaId - * the remote replica id this log belongs to. - * @param remoteLSN - * the remote LSN received from the remote replica. - * @return The local log mapping if found. Otherwise null. - */ - private RemoteLogMapping getRemoteLogMapping(String replicaId, long remoteLSN) { - for (RemoteLogMapping mapping : localLSN2RemoteLSNMap.values()) { - if (mapping.getRemoteLSN() == remoteLSN && mapping.getRemoteNodeID().equals(replicaId)) { - return mapping; - } - } - return null; } @Override @@ -538,56 +527,65 @@ } break; case LogType.JOB_COMMIT: - LogRecord jobCommitLog = new LogRecord(); - TransactionUtil.formJobTerminateLogRecord(jobCommitLog, remoteLog.getJobId(), true); - jobCommitLog.setReplicationThread(this); - jobCommitLog.setLogSource(LogSource.REMOTE); - logManager.log(jobCommitLog); + case LogType.ABORT: + LogRecord jobTerminationLog = new LogRecord(); + TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(), + remoteLog.getLogType() == LogType.JOB_COMMIT); + jobTerminationLog.setReplicationThread(this); + jobTerminationLog.setLogSource(LogSource.REMOTE); + logManager.log(jobTerminationLog); break; case LogType.FLUSH: - LogRecord flushLog = new LogRecord(); - TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null, remoteLog.getNodeId(), - remoteLog.getNumOfFlushedIndexes()); - flushLog.setReplicationThread(this); - flushLog.setLogSource(LogSource.REMOTE); - synchronized (localLSN2RemoteLSNMap) { - logManager.log(flushLog); - //store mapping information for flush logs to use them in incoming LSM components. - RemoteLogMapping flushLogMap = new RemoteLogMapping(); - flushLogMap.setRemoteLSN(remoteLog.getLSN()); - flushLogMap.setRemoteNodeID(remoteLog.getNodeId()); - flushLogMap.setLocalLSN(flushLog.getLSN()); - flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes()); - localLSN2RemoteLSNMap.put(flushLog.getLSN(), flushLogMap); - localLSN2RemoteLSNMap.notifyAll(); + //store mapping information for flush logs to use them in incoming LSM components. + RemoteLogMapping flushLogMap = new RemoteLogMapping(); + flushLogMap.setRemoteNodeID(remoteLog.getNodeId()); + flushLogMap.setRemoteLSN(remoteLog.getLSN()); + logManager.log(remoteLog); + //the log LSN value is updated by logManager.log(.) to a local value + flushLogMap.setLocalLSN(remoteLog.getLSN()); + flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes()); + replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap); + synchronized (flushLogslock) { + flushLogslock.notify(); } break; default: - throw new ACIDException("Unsupported LogType: " + remoteLog.getLogType()); + LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType()); } } - //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and FLUSH log types + //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and JOB_ABORT log types. @Override public void notifyLogReplicationRequester(LogRecord logRecord) { - //Note: this could be optimized by moving this to a different thread and freeing the LogPage thread faster - if (logRecord.getLogType() == LogType.JOB_COMMIT) { - //send ACK to requester + pendingNotificationRemoteLogsQ.offer(logRecord); + } + + @Override + public SocketChannel getReplicationClientSocket() { + return socketChannel; + } + } + + /** + * This thread is responsible for sending JOB_COMMIT/ABORT ACKs to replication clients. + */ + private class ReplicationNotifier extends Thread { + @Override + public void run() { + Thread.currentThread().setName("ReplicationNotifier Thread"); + while (true) { try { - socketChannel.socket().getOutputStream() - .write((localNodeID + ReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n") - .getBytes()); - socketChannel.socket().getOutputStream().flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } else if (logRecord.getLogType() == LogType.FLUSH) { - synchronized (localLSN2RemoteLSNMap) { - RemoteLogMapping remoteLogMap = localLSN2RemoteLSNMap.get(logRecord.getLSN()); - synchronized (remoteLogMap) { - remoteLogMap.setFlushed(true); - remoteLogMap.notifyAll(); + LogRecord logRecord = pendingNotificationRemoteLogsQ.take(); + //send ACK to requester + try { + logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream() + .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId() + + System.lineSeparator()).getBytes()); + } catch (IOException e) { + LOGGER.severe("Failed to send job replication ACK " + logRecord.getLogRecordForDisplay()); } + } catch (InterruptedException e1) { + LOGGER.severe("ReplicationNotifier Thread interrupted."); } } } @@ -629,26 +627,23 @@ return; } + //path to the LSM component file + Path path = Paths.get(syncTask.getComponentFilePath()); if (lsmCompProp.getReplicaLSN() == null) { if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) { //need to look up LSN mapping from memory - RemoteLogMapping remoteLogMap = getRemoteLogMapping(lsmCompProp.getNodeId(), remoteLSN); - - //wait until flush log arrives - while (remoteLogMap == null) { - synchronized (localLSN2RemoteLSNMap) { - localLSN2RemoteLSNMap.wait(); - } - remoteLogMap = getRemoteLogMapping(lsmCompProp.getNodeId(), remoteLSN); - } - - //wait until the log is flushed locally before updating the disk component LSN - synchronized (remoteLogMap) { - while (!remoteLogMap.isFlushed()) { - remoteLogMap.wait(); + RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); + if (remoteLogMap == null) { + synchronized (flushLogslock) { + remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); + //wait until flush log arrives, and verify the LSM component file still exists + //The component file could be deleted if its NC fails. + while (remoteLogMap == null && Files.exists(path)) { + flushLogslock.wait(); + remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); + } } } - lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN()); } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) { //need to load the LSN mapping from disk @@ -665,13 +660,11 @@ * */ mappingLSN = logManager.getAppendLSN(); - } else { - lsmCompProp.setReplicaLSN(mappingLSN); } + lsmCompProp.setReplicaLSN(mappingLSN); } } - Path path = Paths.get(syncTask.getComponentFilePath()); if (Files.notExists(path)) { /* * This could happen when a merged component arrives and deletes the flushed diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index 7243629..93d1085 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -213,7 +213,7 @@ @Override public void replicateLog(ILogRecord logRecord) { - if (logRecord.getLogType() == LogType.JOB_COMMIT) { + if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { //if replication is suspended, wait until it is resumed. while (replicationSuspended.get()) { synchronized (replicationSuspended) { @@ -734,11 +734,7 @@ return true; } else { - if (!replicationJobsPendingAcks.containsKey(logRecord.getJobId())) { - synchronized (replicationJobsPendingAcks) { - replicationJobsPendingAcks.put(logRecord.getJobId(), logRecord); - } - } + replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), logRecord); return false; } } diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java index 841a99f..9749c7a 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.replication.logging.TxnLogUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; @@ -96,6 +97,8 @@ public String getMaskPath(ReplicaResourcesManager resourceManager) { if (maskPath == null) { LSMIndexFileProperties afp = new LSMIndexFileProperties(this); + //split the index file path to get the LSM component file name + afp.splitFileName(); maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName() + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX; } @@ -147,10 +150,6 @@ return nodeId; } - public void setNodeId(String nodeId) { - this.nodeId = nodeId; - } - public int getNumberOfFiles() { return numberOfFiles.get(); } @@ -178,4 +177,8 @@ public void setOpType(LSMOperationType opType) { this.opType = opType; } + + public String getNodeUniqueLSN() { + return TxnLogUtil.getNodeUniqueLSN(nodeId, originalLSN); + } } diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java index 413fd7a..41fc0b8 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java @@ -67,14 +67,12 @@ String indexPath = getIndexPath(afp); if (indexPath != null) { if (afp.isLSMComponentFile()) { - String backupFilePath = indexPath + File.separator + afp.getFileName(); - - //delete file - File destFile = new File(backupFilePath); + //delete index file + String indexFilePath = indexPath + File.separator + afp.getFileName(); + File destFile = new File(indexFilePath); FileUtils.deleteQuietly(destFile); } else { - //delete index files - indexPath = indexPath.substring(0, indexPath.lastIndexOf(File.separator)); + //delete index directory FileUtils.deleteQuietly(new File(indexPath)); } } diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 6060dd7..502e9c7 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -134,8 +134,7 @@ flushQ.offer(logRecord); } } else if (logRecord.getLogSource() == LogSource.REMOTE) { - - if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.FLUSH) { + if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { remoteJobsQ.offer(logRecord); } } @@ -276,7 +275,7 @@ notifyFlushTerminator(); } } else if (logRecord.getLogSource() == LogSource.REMOTE) { - if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.FLUSH) { + if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { notifyReplicationTerminator(); } } diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index 8363ff1..efd66a8 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -72,13 +72,8 @@ //ignore } } - } - } - //wait for job ACK from replicas - //TODO should JOB_ABORT be added as well? - if ((logRecord.getLogType() == LogType.JOB_COMMIT) && !replicationManager.hasBeenReplicated(logRecord)) { - synchronized (logRecord) { + //wait for job Commit/Abort ACK from replicas while (!replicationManager.hasBeenReplicated(logRecord)) { try { logRecord.wait(); -- To view, visit https://asterix-gerrit.ics.uci.edu/743 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I34a38f59c4915a19242adb6a4eaa6ee1c82d2372 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
