This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch Sync-Reconstruct
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/Sync-Reconstruct by this push:
new 3f73140 improve some improper implementations
3f73140 is described below
commit 3f731406e524843fd65d94bedc9d3d8a5bb71f3e
Author: lta <[email protected]>
AuthorDate: Wed Mar 20 22:01:08 2019 +0800
improve some improper implementations
---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 +-
.../org/apache/iotdb/db/sync/conf/Constans.java | 5 +
.../iotdb/db/sync/conf/SyncSenderConfig.java | 14 +-
.../iotdb/db/sync/conf/SyncSenderDescriptor.java | 18 +--
.../iotdb/db/sync/receiver/ServerServiceImpl.java | 162 +++++++++++----------
.../iotdb/db/sync/sender/FileSenderImpl.java | 97 ++++++++----
.../org/apache/iotdb/db/utils/FilePathUtils.java | 75 +++++-----
.../java/org/apache/iotdb/db/utils/SyncUtils.java | 23 ---
service-rpc/src/main/thrift/sync.thrift | 5 +-
9 files changed, 215 insertions(+), 194 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8da75f2..7ab5215 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -257,12 +257,10 @@ public class IoTDBDescriptor {
} finally {
// update all data seriesPath
conf.updatePath();
- if (inputStream != null) {
- try {
- inputStream.close();
- } catch (IOException e) {
- LOGGER.error("Fail to close config file input stream because ", e);
- }
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ LOGGER.error("Fail to close config file input stream because ", e);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
index 23c8c54..f43207f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
@@ -39,4 +39,9 @@ public class Constans {
**/
public static final int DATA_CHUNK_SIZE = 64 * 1024 * 1024;
+ /**
+ * Max try when syncing the same file to receiver fails.
+ */
+ public static final int MAX_SYNC_FILE_TRY = 10;
+
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
index a454f50..50bd443 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.sync.conf;
import java.io.File;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.utils.SyncUtils;
public class SyncSenderConfig {
@@ -38,10 +41,7 @@ public class SyncSenderConfig {
public void init() {
String metadataDirPath =
IoTDBDescriptor.getInstance().getConfig().getMetadataDir();
- if (metadataDirPath.length() > 0
- && metadataDirPath.charAt(metadataDirPath.length() - 1) !=
File.separatorChar) {
- metadataDirPath = metadataDirPath + File.separatorChar;
- }
+ metadataDirPath = FilePathUtils.regularizePath(metadataDirPath);
schemaPath = metadataDirPath + MetadataConstant.METADATA_LOG;
if (dataDirectory.length() > 0
&& dataDirectory.charAt(dataDirectory.length() - 1) !=
File.separatorChar) {
@@ -55,11 +55,7 @@ public class SyncSenderConfig {
snapshotPaths = new String[bufferwriteDirectory.length];
for (int i = 0; i < bufferwriteDirectory.length; i++) {
bufferwriteDirectory[i] = new
File(bufferwriteDirectory[i]).getAbsolutePath();
- if (bufferwriteDirectory[i].length() > 0
- && bufferwriteDirectory[i].charAt(bufferwriteDirectory[i].length() -
1)
- != File.separatorChar) {
- bufferwriteDirectory[i] = bufferwriteDirectory[i] + File.separatorChar;
- }
+ bufferwriteDirectory[i] =
FilePathUtils.regularizePath(bufferwriteDirectory[i]);
snapshotPaths[i] = bufferwriteDirectory[i] + Constans.SYNC_CLIENT +
File.separatorChar
+ Constans.DATA_SNAPSHOT_NAME
+ File.separatorChar;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
index 9a8ed5d..99dcb56 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.utils.SyncUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,11 +107,7 @@ public class SyncSenderDescriptor {
String[] iotdbBufferwriteDirectory = conf.getBufferwriteDirectory();
String[] snapshots = new String[conf.getBufferwriteDirectory().length];
for (int i = 0; i < conf.getBufferwriteDirectory().length; i++) {
- if (iotdbBufferwriteDirectory[i].length() > 0
- &&
iotdbBufferwriteDirectory[i].charAt(iotdbBufferwriteDirectory[i].length() - 1)
- != File.separatorChar) {
- iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] +
File.separatorChar;
- }
+ iotdbBufferwriteDirectory[i] =
FilePathUtils.regularizePath(iotdbBufferwriteDirectory[i]);
snapshots[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC_CLIENT +
File.separatorChar
+ Constans.DATA_SNAPSHOT_NAME + File.separatorChar;
}
@@ -120,12 +118,10 @@ public class SyncSenderDescriptor {
} catch (Exception e) {
LOGGER.warn("Error format in config file because {}, use default
configuration", e);
} finally {
- if (inputStream != null) {
- try {
- inputStream.close();
- } catch (IOException e) {
- LOGGER.error("Fail to close sync config file input stream because ",
e);
- }
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ LOGGER.error("Fail to close sync config file input stream because ",
e);
}
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
index fa072cb..7c938b3 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
@@ -29,14 +29,16 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -54,6 +56,7 @@ import org.apache.iotdb.db.metadata.MetadataOperationType;
import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.sync.conf.Constans;
+import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.SyncUtils;
import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
import org.apache.iotdb.service.sync.thrift.SyncService;
@@ -137,10 +140,10 @@ public class ServerServiceImpl implements
SyncService.Iface {
private String syncDataPath;
/**
- * Init threadLocal variable
+ * Init threadLocal variable and delete old useless files.
*/
@Override
- public void init(String storageGroup) {
+ public boolean init(String storageGroup) {
if (logger.isInfoEnabled()) {
logger.info("Sync process starts to receive data of storage group {}",
storageGroup);
}
@@ -148,6 +151,26 @@ public class ServerServiceImpl implements
SyncService.Iface {
fileNodeMap.set(new HashMap<>());
fileNodeStartTime.set(new HashMap<>());
fileNodeEndTime.set(new HashMap<>());
+ try {
+ FileUtils.deleteDirectory(new File(syncDataPath));
+ } catch (IOException e) {
+ logger.error("cannot delete directory {} ", syncFolderPath);
+ return false;
+ }
+ for (String bufferWritePath : bufferWritePaths) {
+ bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
+ String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
+ File backupDirectory = new File(backupPath, this.uuid.get());
+ if (backupDirectory.exists() && backupDirectory.list().length != 0) {
+ try {
+ FileUtils.deleteDirectory(backupDirectory);
+ } catch (IOException e) {
+ logger.error("cannot delete directory {} ", syncFolderPath);
+ return false;
+ }
+ }
+ }
+ return true;
}
/**
@@ -165,54 +188,38 @@ public class ServerServiceImpl implements
SyncService.Iface {
* Init file path and clear data if last sync process failed.
*/
private void initPath() {
- if (dataPath.length() > 0 && dataPath.charAt(dataPath.length() - 1) !=
File.separatorChar) {
- dataPath = dataPath + File.separatorChar;
- }
+ dataPath = FilePathUtils.regularizePath(dataPath);
syncFolderPath = dataPath + SYNC_SERVER + File.separatorChar +
this.uuid.get();
syncDataPath = syncFolderPath + File.separatorChar +
Constans.DATA_SNAPSHOT_NAME;
schemaFromSenderPath
.set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG);
- File syncFileDirectory = new File(syncFolderPath, this.uuid.get());
- if (syncFileDirectory.exists()
- && Objects.requireNonNull(syncFileDirectory.list()).length != 0) {
- SyncUtils.deleteFile(syncFileDirectory);
- }
- for (String bufferWritePath : bufferWritePaths) {
- if (bufferWritePath.length() > 0
- && bufferWritePath.charAt(bufferWritePath.length() - 1) !=
File.separatorChar) {
- bufferWritePath = bufferWritePath + File.separatorChar;
- }
- String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
- File backupDirectory = new File(backupPath, this.uuid.get());
- if (backupDirectory.exists() &&
Objects.requireNonNull(backupDirectory.list()).length != 0) {
- /** if does not exist, it means that the last time sync failed, clear
data in the uuid directory and receive the data again **/
- SyncUtils.deleteFile(backupDirectory);
- }
- }
}
/**
* Acquire schema from sender
*
- * @param status: SUCCESS_STATUS or PROCESSING_STATUS. status =
SUCCESS_STATUS : finish receiving
- * schema file, start to sync schema. status = SUCCESS_STATUS : the schema
file has not received
- * completely.
+ * @param status: FINIFSH_STATUS, SUCCESS_STATUS or PROCESSING_STATUS.
status = FINISH_STATUS :
+ * finish receiving schema file, start to sync schema. status =
PROCESSING_STATUS : the schema
+ * file has not received SUCCESS_STATUS: load metadata completely.
*/
@Override
- public void syncSchema(ByteBuffer schema, SyncDataStatus status) {
+ public String syncSchema(String md5, ByteBuffer schema, SyncDataStatus
status) {
+ String md5OfReceiver = "";
if (status == SyncDataStatus.SUCCESS_STATUS) {
/** sync metadata, include storage group and timeseries **/
- loadMetadata();
- } else {
+ return Boolean.toString(loadMetadata());
+ } else if (status == SyncDataStatus.PROCESSING_STATUS) {
File file = new File(schemaFromSenderPath.get());
if (!file.getParentFile().exists()) {
try {
file.getParentFile().mkdirs();
if (!file.createNewFile()) {
logger.error("Cannot create file {}", file.getPath());
+ return null;
}
} catch (IOException e) {
logger.error("Cannot make schema file {}.", file.getPath(), e);
+ return null;
}
}
try (FileOutputStream fos = new FileOutputStream(file, true);
@@ -220,15 +227,31 @@ public class ServerServiceImpl implements
SyncService.Iface {
channel.write(schema);
} catch (Exception e) {
logger.error("Cannot write data to file {}.", file.getPath(), e);
+ return null;
+ }
+ } else {
+ try (FileInputStream fis = new
FileInputStream(schemaFromSenderPath.get())) {
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+ int n;
+ while ((n = fis.read(buffer)) != -1) {
+ md.update(buffer, 0, n);
+ }
+ md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+ if (!md5.equals(md5OfReceiver)) {
+ FileUtils.forceDelete(new File(schemaFromSenderPath.get()));
+ }
+ } catch (Exception e) {
+ logger.error("Receiver cannot generate md5 {}",
schemaFromSenderPath.get(), e);
}
}
-
+ return md5OfReceiver;
}
/**
* Load metadata from sender
*/
- private void loadMetadata() {
+ private boolean loadMetadata() {
if (new File(schemaFromSenderPath.get()).exists()) {
try (BufferedReader br = new BufferedReader(
new java.io.FileReader(schemaFromSenderPath.get()))) {
@@ -239,12 +262,15 @@ public class ServerServiceImpl implements
SyncService.Iface {
} catch (FileNotFoundException e) {
logger.error("Cannot read the file {}.",
schemaFromSenderPath.get(), e);
+ return false;
} catch (IOException e) {
//TODO: how to deal with multiple insert schema
} catch (Exception e) {
logger.error("Parse metadata operation failed.", e);
+ return false;
}
}
+ return true;
}
/**
@@ -257,7 +283,7 @@ public class ServerServiceImpl implements SyncService.Iface
{
String[] args = cmd.trim().split(",");
switch (args[0]) {
case MetadataOperationType.ADD_PATH_TO_MTREE:
- Map<String, String> props = null;
+ Map<String, String> props;
String[] kv;
props = new HashMap<>(args.length - 5 + 1, 1);
for (int k = 5; k < args.length; k++) {
@@ -305,30 +331,17 @@ public class ServerServiceImpl implements
SyncService.Iface {
public String syncData(String md5OfSender, List<String> filePathSplit,
ByteBuffer dataToReceive, SyncDataStatus status) {
String md5OfReceiver = "";
- StringBuilder filePathBuilder = new StringBuilder();
FileChannel channel;
/** Recombination File Path **/
- for (int i = 0; i < filePathSplit.size(); i++) {
- if (i == filePathSplit.size() - 1) {
- filePathBuilder.append(filePathSplit.get(i));
- } else {
- filePathBuilder.append(filePathSplit.get(i)).append(File.separator);
- }
- }
- String filePath = filePathBuilder.toString();
- if (syncDataPath.length() > 0
- && syncDataPath.charAt(syncDataPath.length() - 1) !=
File.separatorChar) {
- syncDataPath = syncDataPath + File.separatorChar;
- }
+ String filePath = StringUtils.join(filePathSplit, File.separatorChar);
+ syncDataPath = FilePathUtils.regularizePath(syncDataPath);
filePath = syncDataPath + filePath;
if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data
stream to add
File file = new File(filePath);
if (!file.getParentFile().exists()) {
try {
file.getParentFile().mkdirs();
- if (!file.createNewFile()) {
- logger.error("cannot create file {}", file.getPath());
- }
+ file.createNewFile();
} catch (IOException e) {
logger.error("cannot make file {}", file.getPath(), e);
}
@@ -355,12 +368,10 @@ public class ServerServiceImpl implements
SyncService.Iface {
logger.info(String.format("Receiver has received %d files from
sender", fileNum.get()));
}
} else {
- if (!new File(filePath).delete()) {
- logger.error("Receiver can not delete file {}", new
File(filePath).getPath());
- }
+ FileUtils.forceDelete(new File(filePath));
}
} catch (Exception e) {
- logger.error("Receiver cannot generate md5", e);
+ logger.error("Receiver cannot generate md5 {}", filePath, e);
}
}
return md5OfReceiver;
@@ -369,27 +380,20 @@ public class ServerServiceImpl implements
SyncService.Iface {
@Override
public boolean load() {
- getFileNodeInfo();
- loadData();
- SyncUtils.deleteFile(new File(syncDataPath));
- for (String bufferWritePath : bufferWritePaths) {
- if (bufferWritePath.length() > 0
- && bufferWritePath.charAt(bufferWritePath.length() - 1) !=
File.separatorChar) {
- bufferWritePath = bufferWritePath + File.separatorChar;
- }
- String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
- File backupDirectory = new File(backupPath, this.uuid.get());
- if (backupDirectory.exists() &&
Objects.requireNonNull(backupDirectory.list()).length != 0) {
- SyncUtils.deleteFile(backupDirectory);
- }
+ try {
+ getFileNodeInfo();
+ loadData();
+ } catch (Exception e) {
+ logger.error("fail to load data", e);
+ return false;
}
return true;
}
/**
- * Get all tsfiles' info which are sent from sender, it is prepare for
merging these data
+ * Get all tsfiles' info which are sent from sender, it is preparing for
merging these data
*/
- public void getFileNodeInfo() {
+ public void getFileNodeInfo() throws IOException {
File dataFileRoot = new File(syncDataPath);
File[] files = dataFileRoot.listFiles();
int processedNum = 0;
@@ -410,15 +414,17 @@ public class ServerServiceImpl implements
SyncService.Iface {
startTimeMap.put(key, device.getStartTime());
endTimeMap.put(key, device.getEndTime());
}
- } catch (Exception e) {
- logger.error("Unable to read tsfile {}", fileTF.getPath(), e);
+ } catch (IOException e) {
+ logger.error("Unable to read tsfile {}", fileTF.getPath());
+ throw new IOException(e);
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
- logger.error("Cannot close tsfile stream {}", fileTF.getPath(), e);
+ logger.error("Cannot close tsfile stream {}", fileTF.getPath());
+ throw new IOException(e);
}
}
fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
@@ -440,11 +446,8 @@ public class ServerServiceImpl implements
SyncService.Iface {
* directly. If data in the tsfile is old, it has two strategy to merge.It
depends on the
* possibility of updating historical data.
*/
- public void loadData() {
- if (syncDataPath.length() > 0
- && syncDataPath.charAt(syncDataPath.length() - 1) !=
File.separatorChar) {
- syncDataPath = syncDataPath + File.separatorChar;
- }
+ public void loadData() throws FileNodeManagerException {
+ syncDataPath = FilePathUtils.regularizePath(syncDataPath);
int processedNum = 0;
for (String storageGroup : fileNodeMap.get().keySet()) {
List<String> filesPath = fileNodeMap.get().get(storageGroup);
@@ -500,7 +503,8 @@ public class ServerServiceImpl implements SyncService.Iface
{
}
}
} catch (FileNodeManagerException e) {
- logger.error("Can not load external file ", e);
+ logger.error("Can not load external file {}", path);
+ throw new FileNodeManagerException(e);
}
processedNum++;
@@ -725,7 +729,11 @@ public class ServerServiceImpl implements
SyncService.Iface {
fileNodeStartTime.remove();
fileNodeEndTime.remove();
schemaFromSenderPath.remove();
- SyncUtils.deleteFile(new File(syncFolderPath));
+ try {
+ FileUtils.deleteDirectory(new File(syncFolderPath));
+ } catch (IOException e) {
+ logger.error("can not delete directory {}", syncFolderPath, e);
+ }
logger.info("Synchronization has finished!");
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
index 5090fb1..bcd1b20 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
@@ -42,6 +42,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.conf.Constans;
@@ -178,7 +179,7 @@ public class FileSenderImpl implements FileSender {
for (String snapshotPath : config.getSnapshotPaths()) {
if (new File(snapshotPath).exists() && new
File(snapshotPath).list().length != 0) {
/** It means that the last task of sync does not succeed! Clear the
files and start to sync again **/
- SyncUtils.deleteFile(new File(snapshotPath));
+ FileUtils.deleteDirectory(new File(snapshotPath));
}
}
@@ -195,7 +196,7 @@ public class FileSenderImpl implements FileSender {
establishConnection(config.getServerIp(), config.getServerPort());
if (!confirmIdentity(config.getUuidPath())) {
LOGGER.error("Sorry, you do not have the permission to connect to sync
receiver.");
- return;
+ System.exit(1);
}
// 4. Create snapshot
@@ -205,15 +206,21 @@ public class FileSenderImpl implements FileSender {
syncStatus = true;
- // 5. Sync schema
- syncSchema();
+ try{
+ // 5. Sync schema
+ syncSchema();
- // 6. Sync data
- syncAllData();
+ // 6. Sync data
+ syncAllData();
+ }catch (SyncConnectionException e){
+ LOGGER.error("cannot finish sync process", e);
+ syncStatus = false;
+ return;
+ }
// 7. clear snapshot
for (String snapshotPath : config.getSnapshotPaths()) {
- SyncUtils.deleteFile(new File(snapshotPath));
+ FileUtils.deleteDirectory(new File(snapshotPath));
}
// 8. notify receiver that synchronization finish
@@ -238,7 +245,9 @@ public class FileSenderImpl implements FileSender {
}
LOGGER.info("Sync process starts to transfer data of storage group {}",
entry.getKey());
try {
- serviceClient.init(entry.getKey());
+ if(!serviceClient.init(entry.getKey())){
+ throw new SyncConnectionException("unable init receiver");
+ }
} catch (TException e) {
throw new SyncConnectionException("Unable to connect to receiver", e);
}
@@ -249,8 +258,7 @@ public class FileSenderImpl implements FileSender {
fileManager.backupNowLocalFileInfo(config.getLastFileInfo());
LOGGER.info("Sync process has finished storage group {}.",
entry.getKey());
} else {
- throw new SyncConnectionException(
- "Receiver cannot sync data, abandon this synchronization");
+ LOGGER.error("Receiver cannot sync data, abandon this synchronization
of storage group {}", entry.getKey());
}
}
}
@@ -365,8 +373,14 @@ public class FileSenderImpl implements FileSender {
filePathSplit.add(name[name.length - 2]);
filePathSplit.add(name[name.length - 1]);
}
+ int retryCount = 0;
while (true) {
// Sync all data to receiver
+ if(retryCount > Constans.MAX_SYNC_FILE_TRY){
+ throw new SyncConnectionException(String
+ .format("can not sync file %s after %s tries.",
snapshotFilePath,
+ Constans.MAX_SYNC_FILE_TRY));
+ }
byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
int data;
try (FileInputStream fis = new FileInputStream(file)) {
@@ -392,11 +406,12 @@ public class FileSenderImpl implements FileSender {
// the file is sent successfully
String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
String md5OfReceiver = serviceClient.syncData(md5OfSender,
filePathSplit,
- null, SyncDataStatus.SUCCESS_STATUS);
+ null, SyncDataStatus.FINISH_STATUS);
if (md5OfSender.equals(md5OfReceiver)) {
LOGGER.info("Receiver has received {} successfully.",
snapshotFilePath);
break;
}
+ retryCount++;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(String.format("Task of synchronization has completed
%d/%d.", successNum,
@@ -413,24 +428,48 @@ public class FileSenderImpl implements FileSender {
*/
@Override
public void syncSchema() throws SyncConnectionException {
- try (FileInputStream fis = new FileInputStream(new
File(config.getSchemaPath()))) {
- int mBufferSize = 4 * 1024 * 1024;
- ByteArrayOutputStream bos = new ByteArrayOutputStream(mBufferSize);
- byte[] buffer = new byte[mBufferSize];
- int n;
- while ((n = fis.read(buffer)) != -1) { // cut the file into pieces to
send
- bos.write(buffer, 0, n);
- ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
- bos.reset();
- // 1 represents there is still schema buffer to send.
- serviceClient.syncSchema(buffToSend, SyncDataStatus.PROCESSING_STATUS);
+ int retryCount = 0;
+ while (true) {
+ if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
+ throw new SyncConnectionException(String
+ .format("can not sync schema after %s tries.",
Constans.MAX_SYNC_FILE_TRY));
+ }
+ try (FileInputStream fis = new FileInputStream(new
File(config.getSchemaPath()))) {
+ int mBufferSize = 4 * 1024 * 1024;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(mBufferSize);
+ byte[] buffer = new byte[mBufferSize];
+ int n;
+ while ((n = fis.read(buffer)) != -1) { // cut the file into pieces to
send
+ bos.write(buffer, 0, n);
+ ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+ bos.reset();
+ // PROCESSING_STATUS represents there is still schema buffer to send.
+ serviceClient.syncSchema(null, buffToSend,
SyncDataStatus.PROCESSING_STATUS);
+ }
+ bos.close();
+ // Get md5 of the file.
+ fis.reset();
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ int data;
+ while ((data = fis.read(buffer)) != -1) {
+ md.update(buffer, 0, data);
+ }
+ String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
+ String md5OfReceiver = serviceClient
+ .syncSchema(md5OfSender, null, SyncDataStatus.FINISH_STATUS);
+ if (md5OfSender.equals(md5OfReceiver)) {
+ LOGGER.info("Receiver has received schema successfully.");
+ /** receiver start to load metadata **/
+ if(Boolean.parseBoolean(serviceClient.syncSchema(null, null,
SyncDataStatus.SUCCESS_STATUS))){
+ throw new SyncConnectionException("receiver failed to load
metadata");
+ }
+ break;
+ }
+ retryCount++;
+ }catch (Exception e) {
+ LOGGER.error("Cannot sync schema ", e);
+ throw new SyncConnectionException(e);
}
- bos.close();
- // 0 represents the schema file has been transferred completely.
- serviceClient.syncSchema(null, SyncDataStatus.SUCCESS_STATUS);
- } catch (Exception e) {
- LOGGER.error("Cannot sync schema ", e);
- throw new SyncConnectionException(e);
}
}
@@ -479,7 +518,7 @@ public class FileSenderImpl implements FileSender {
try {
fileLock.release();
randomAccessFile.close();
- file.delete();
+ FileUtils.forceDelete(file);
} catch (Exception e) {
LOGGER.error("Unable to remove lock file: {}", lockFile, e);
}
diff --git a/service-rpc/src/main/thrift/sync.thrift
b/iotdb/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
old mode 100755
new mode 100644
similarity index 62%
copy from service-rpc/src/main/thrift/sync.thrift
copy to iotdb/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index e139783..a797b5c
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -1,37 +1,38 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-namespace java org.apache.iotdb.service.sync.thrift
-
-typedef i32 int
-typedef i16 short
-typedef i64 long
-
-enum SyncDataStatus {
- SUCCESS_STATUS,
- PROCESSING_STATUS
-}
-
-service SyncService{
- bool checkIdentity(1:string uuid, 2:string address)
- void syncSchema(1:binary buff, 2:SyncDataStatus status)
- string syncData(1:string md5, 2:list<string> filename, 3:binary buff,
4:SyncDataStatus status)
- bool load()
- void cleanUp()
- void init(1:string storageGroup)
-}
\ No newline at end of file
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import java.io.File;
+
+public class FilePathUtils {
+
+ /**
+ * Format file path to end with File.separator
+ * @param filePath origin file path
+ * @return Regularized Path
+ */
+ public static String regularizePath(String filePath){
+ if (filePath.length() > 0
+ && filePath.charAt(filePath.length() - 1) != File.separatorChar) {
+ filePath = filePath + File.separatorChar;
+ }
+ return filePath;
+ }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
b/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 904b22c..d395038 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.text.DecimalFormat;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Set;
import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
@@ -124,26 +123,4 @@ public class SyncUtils {
ipAddressBinary = ipAddressBinary.substring(0, subnetMark);
return ipAddressBinary.equals(ipSegmentBinary);
}
-
- /**
- * Remove all files under this folder recursively
- *
- * @param file folder file
- */
- public static void deleteFile(File file) {
- if (!file.exists()) {
- return;
- }
- if (file.isFile() || Objects.requireNonNull(file.list()).length == 0) {
- file.delete();
- } else {
- File[] files = file.listFiles();
- assert files != null;
- for (File f : files) {
- deleteFile(f);
- f.delete();
- }
- file.delete();
- }
- }
}
diff --git a/service-rpc/src/main/thrift/sync.thrift
b/service-rpc/src/main/thrift/sync.thrift
index e139783..56b2774 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -24,14 +24,15 @@ typedef i64 long
enum SyncDataStatus {
SUCCESS_STATUS,
+ FINISH_STATUS,
PROCESSING_STATUS
}
service SyncService{
bool checkIdentity(1:string uuid, 2:string address)
- void syncSchema(1:binary buff, 2:SyncDataStatus status)
+ string syncSchema(1:string md5, 2:binary buff, 3:SyncDataStatus status)
string syncData(1:string md5, 2:list<string> filename, 3:binary buff,
4:SyncDataStatus status)
bool load()
void cleanUp()
- void init(1:string storageGroup)
+ bool init(1:string storageGroup)
}
\ No newline at end of file