This is an automated email from the ASF dual-hosted git repository.
liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7dd3722 [IOTDB-8] Fix filenode restore deserialization (#96)
7dd3722 is described below
commit 7dd3722b713ddfff37c70d9b11a50dbe7388b77e
Author: Kun Liu <[email protected]>
AuthorDate: Fri Mar 22 13:37:20 2019 +0800
[IOTDB-8] Fix filenode restore deserialization (#96)
---
.../db/engine/filenode/FileNodeProcessor.java | 39 +++---
.../engine/filenode/FileNodeProcessorStatus.java | 32 ++++-
.../db/engine/filenode/FileNodeProcessorStore.java | 55 ++++++++-
.../db/engine/filenode/OverflowChangeType.java | 31 ++++-
.../iotdb/db/engine/filenode/SerializeUtil.java | 72 -----------
.../iotdb/db/engine/filenode/TsFileResource.java | 101 ++++++++++++---
.../filenode/FileNodeProcessorStoreTest.java | 91 ++++++++++++++
.../db/engine/filenode/SerializeUtilTest.java | 136 ---------------------
.../db/engine/filenode/TsFileResourceTest.java | 96 +++++++++++++++
9 files changed, 401 insertions(+), 252 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 2c770a5..9f9637d 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,10 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.engine.filenode;
+import static java.time.ZonedDateTime.ofInstant;
+
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
@@ -41,7 +44,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -102,8 +104,6 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.time.ZonedDateTime.ofInstant;
-
public class FileNodeProcessor extends Processor implements IStatistic {
private static final String WARN_NO_SUCH_OVERFLOWED_FILE = "Can not find any
tsfile which"
@@ -725,8 +725,8 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
newMultiPassLock.readLock().unlock();
newMultiPassTokenSet.remove(token);
LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}",
token,
- getProcessorName(),
- newMultiPassTokenSet, newMultiPassLock);
+ getProcessorName(),
+ newMultiPassTokenSet, newMultiPassLock);
return true;
} else if (oldMultiPassTokenSet != null &&
oldMultiPassTokenSet.contains(token)) {
// remove token first, then unlock
@@ -1796,9 +1796,8 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
throws FileNodeProcessorException {
synchronized (fileNodeRestoreLock) {
- SerializeUtil<FileNodeProcessorStore> serializeUtil = new
SerializeUtil<>();
- try {
- serializeUtil.serialize(fileNodeProcessorStore,
fileNodeRestoreFilePath);
+ try (FileOutputStream fileOutputStream = new
FileOutputStream(fileNodeRestoreFilePath)) {
+ fileNodeProcessorStore.serialize(fileOutputStream);
LOGGER.debug("The filenode processor {} writes restore information to
the restore file",
getProcessorName());
} catch (IOException e) {
@@ -1810,17 +1809,21 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
private FileNodeProcessorStore readStoreFromDisk() throws
FileNodeProcessorException {
synchronized (fileNodeRestoreLock) {
- FileNodeProcessorStore processorStore;
- SerializeUtil<FileNodeProcessorStore> serializeUtil = new
SerializeUtil<>();
- try {
- processorStore = serializeUtil.deserialize(fileNodeRestoreFilePath)
- .orElse(new FileNodeProcessorStore(false, new HashMap<>(),
- new TsFileResource(OverflowChangeType.NO_CHANGE, null),
- new ArrayList<>(), FileNodeProcessorStatus.NONE, 0));
+
+ File restoreFile = new File(fileNodeRestoreFilePath);
+ if (!restoreFile.exists() || restoreFile.length() == 0) {
+ return new FileNodeProcessorStore(false, new HashMap<>(),
+ new TsFileResource(OverflowChangeType.NO_CHANGE, null),
+ new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
+ }
+ try (FileInputStream inputStream = new
FileInputStream(fileNodeRestoreFilePath)) {
+ return FileNodeProcessorStore.deSerialize(inputStream);
} catch (IOException e) {
+ LOGGER
+ .error("Failed to deserialize the FileNodeRestoreFile {}, {}",
fileNodeRestoreFilePath,
+ e);
throw new FileNodeProcessorException(e);
}
- return processorStore;
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStatus.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStatus.java
index 067d3a5..b3ce1e5 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStatus.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStatus.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,5 +19,33 @@
package org.apache.iotdb.db.engine.filenode;
public enum FileNodeProcessorStatus {
- NONE, MERGING_WRITE, WAITING
+ NONE, MERGING_WRITE, WAITING;
+
+ public static FileNodeProcessorStatus deserialize(short i) {
+ switch (i) {
+ case 0:
+ return NONE;
+ case 1:
+ return MERGING_WRITE;
+ case 2:
+ return WAITING;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Invalid input %d for FileNodeProcessorStatus", i));
+ }
+ }
+
+ public short serialize() {
+ switch (this) {
+ case NONE:
+ return 0;
+ case MERGING_WRITE:
+ return 1;
+ case WAITING:
+ return 2;
+ default:
+ throw new IllegalStateException("Unsupported type");
+ }
+
+ }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
index db56364..7c1eca4 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,10 +18,17 @@
*/
package org.apache.iotdb.db.engine.filenode;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
/**
* FileNodeProcessorStore is used to store information about
FileNodeProcessor's status.
@@ -30,8 +37,6 @@ import java.util.Map;
* Overflow close. fileNodeProcessorState is changed and stored by the change
of FileNodeProcessor's
* status such as "work->merge merge->wait wait->work". numOfMergeFile is
changed
* and stored when FileNodeProcessor's status changes from work to merge.
- *
- * @author liukun
*/
public class FileNodeProcessorStore implements Serializable {
@@ -46,6 +51,7 @@ public class FileNodeProcessorStore implements Serializable {
/**
* Constructor of FileNodeProcessorStore.
+ *
* @param isOverflowed whether this FileNode contains unmerged Overflow
operations.
* @param lastUpdateTimeMap the timestamp of last data point of each device
in this FileNode.
* @param emptyTsFileResource a place holder when the FileNode contains no
TsFile.
@@ -66,6 +72,49 @@ public class FileNodeProcessorStore implements Serializable {
this.numOfMergeFile = numOfMergeFile;
}
+ public void serialize(OutputStream outputStream) throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ReadWriteIOUtils.write(this.isOverflowed, byteArrayOutputStream);
+ // lastUpdateTimeMap
+ ReadWriteIOUtils.write(lastUpdateTimeMap.size(), byteArrayOutputStream);
+ for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteArrayOutputStream);
+ ReadWriteIOUtils.write(entry.getValue(), byteArrayOutputStream);
+ }
+ this.emptyTsFileResource.serialize(byteArrayOutputStream);
+ ReadWriteIOUtils.write(this.newFileNodes.size(), byteArrayOutputStream);
+ for (TsFileResource tsFileResource : this.newFileNodes) {
+ tsFileResource.serialize(byteArrayOutputStream);
+ }
+ ReadWriteIOUtils.write(this.numOfMergeFile, byteArrayOutputStream);
+ ReadWriteIOUtils.write(this.fileNodeProcessorStatus.serialize(),
byteArrayOutputStream);
+ // buffer array to outputstream
+ byteArrayOutputStream.writeTo(outputStream);
+ }
+
+ public static FileNodeProcessorStore deSerialize(InputStream inputStream)
throws IOException {
+ boolean isOverflowed = ReadWriteIOUtils.readBool(inputStream);
+ Map<String, Long> lastUpdateTimeMap = new HashMap<>();
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ String path = ReadWriteIOUtils.readString(inputStream);
+ long time = ReadWriteIOUtils.readLong(inputStream);
+ lastUpdateTimeMap.put(path, time);
+ }
+ TsFileResource emptyTsFileResource =
TsFileResource.deSerialize(inputStream);
+ size = ReadWriteIOUtils.readInt(inputStream);
+ List<TsFileResource> newFileNodes = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ newFileNodes.add(TsFileResource.deSerialize(inputStream));
+ }
+ int numOfMergeFile = ReadWriteIOUtils.readInt(inputStream);
+ FileNodeProcessorStatus fileNodeProcessorStatus = FileNodeProcessorStatus
+ .deserialize(ReadWriteIOUtils.readShort(inputStream));
+
+ return new FileNodeProcessorStore(isOverflowed, lastUpdateTimeMap,
emptyTsFileResource,
+ newFileNodes, fileNodeProcessorStatus, numOfMergeFile);
+ }
+
public boolean isOverflowed() {
return isOverflowed;
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
index 5bbf034..48c8eee 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -26,5 +26,32 @@ package org.apache.iotdb.db.engine.filenode;
* After merging, if it's MERGING_CHANGE, MERGING_CHANGE-->CHANGED, otherwise
in NO_CHANGE, MERGING_CHANGE-->NO_CHANGE
*/
public enum OverflowChangeType {
- NO_CHANGE, CHANGED, MERGING_CHANGE,
+ NO_CHANGE, CHANGED, MERGING_CHANGE;
+
+ public short serialize() {
+ switch (this) {
+ case NO_CHANGE:
+ return 0;
+ case CHANGED:
+ return 1;
+ case MERGING_CHANGE:
+ return 2;
+ default:
+ throw new IllegalStateException("Unsupported type");
+ }
+ }
+
+ public static OverflowChangeType deserialize(short i) {
+ switch (i) {
+ case 0:
+ return NO_CHANGE;
+ case 1:
+ return CHANGED;
+ case 2:
+ return MERGING_CHANGE;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Invalid input %d for OverflowChangeType", i));
+ }
+ }
}
\ No newline at end of file
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/SerializeUtil.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/SerializeUtil.java
deleted file mode 100644
index e11eaf4..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/SerializeUtil.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Optional;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is used to serialize or deserialize the object T.
- */
-public class SerializeUtil<T> {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(SerializeUtil.class);
-
- /**
- * serialize obj and write to filePath.
- */
- public void serialize(Object obj, String filePath) throws IOException {
- try (FileOutputStream fileOutputStream = new FileOutputStream(filePath);
- ObjectOutputStream oos = new ObjectOutputStream(fileOutputStream)) {
- oos.writeObject(obj);
- oos.flush();
- } catch (IOException e) {
- LOGGER.error("Serizelize the object failed.", e);
- throw e;
- }
- }
-
- /**
- * deserialize obj from filePath.
- */
- public Optional<T> deserialize(String filePath) throws IOException {
- File file = new File(filePath);
- if (!file.exists()) {
- return Optional.empty();
- }
- T result;
- try (FileInputStream fis = new FileInputStream(file);
- ObjectInputStream ois = new ObjectInputStream(fis)) {
- result = (T) ois.readObject();
- } catch (Exception e) {
- LOGGER.error("Deserialize the object error.", e);
- return Optional.empty();
- }
- return Optional.ofNullable(result);
- }
-
-}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
index 5f0b376..2806256 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
@@ -19,22 +19,23 @@
package org.apache.iotdb.db.engine.filenode;
import java.io.File;
-import java.io.Serializable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
-
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
/**
* This class is used to store one bufferwrite file status.<br>
*/
-public class TsFileResource implements Serializable {
-
- private static final long serialVersionUID = -4309683416067212549L;
+public class TsFileResource {
private OverflowChangeType overflowChangeType;
private int baseDirIndex;
@@ -46,7 +47,7 @@ public class TsFileResource implements Serializable {
private transient ModificationFile modFile;
public TsFileResource(Map<String, Long> startTimeMap, Map<String, Long>
endTimeMap,
- OverflowChangeType type, int baseDirIndex, String
relativePath) {
+ OverflowChangeType type, int baseDirIndex, String relativePath) {
this.overflowChangeType = type;
this.baseDirIndex = baseDirIndex;
@@ -62,7 +63,7 @@ public class TsFileResource implements Serializable {
/**
* This is just used to construct a new bufferwritefile.
*
- * @param type whether this file is affected by overflow and how it
is affected.
+ * @param type whether this file is affected by overflow and how it is
affected.
* @param relativePath the path of the file relative to the FileNode.
*/
public TsFileResource(OverflowChangeType type, int baseDirIndex, String
relativePath) {
@@ -96,6 +97,64 @@ public class TsFileResource implements Serializable {
this(type, 0, relativePath);
}
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(this.overflowChangeType.serialize(), outputStream);
+ ReadWriteIOUtils.write(this.baseDirIndex, outputStream);
+ ReadWriteIOUtils.writeIsNull(this.relativePath, outputStream);
+ if (this.relativePath != null) {
+ ReadWriteIOUtils.write(this.relativePath, outputStream);
+ }
+ ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
+ for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ ReadWriteIOUtils.write(this.endTimeMap.size(), outputStream);
+ for (Entry<String, Long> entry : this.endTimeMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ ReadWriteIOUtils.write(mergeChanged.size(), outputStream);
+ for (String mergeChangedElement : this.mergeChanged) {
+ ReadWriteIOUtils.write(mergeChangedElement, outputStream);
+ }
+ }
+
+ public static TsFileResource deSerialize(InputStream inputStream) throws
IOException {
+ OverflowChangeType overflowChangeType = OverflowChangeType
+ .deserialize(ReadWriteIOUtils.readShort(inputStream));
+ int baseDirIndex = ReadWriteIOUtils.readInt(inputStream);
+ boolean hasRelativePath = ReadWriteIOUtils.readIsNull(inputStream);
+ String relativePath = null;
+ if (hasRelativePath) {
+ relativePath = ReadWriteIOUtils.readString(inputStream);
+ }
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ Map<String, Long> startTimes = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ String path = ReadWriteIOUtils.readString(inputStream);
+ long time = ReadWriteIOUtils.readLong(inputStream);
+ startTimes.put(path, time);
+ }
+ size = ReadWriteIOUtils.readInt(inputStream);
+ Map<String, Long> endTimes = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ String path = ReadWriteIOUtils.readString(inputStream);
+ long time = ReadWriteIOUtils.readLong(inputStream);
+ endTimes.put(path, time);
+ }
+ size = ReadWriteIOUtils.readInt(inputStream);
+ Set<String> mergeChanaged = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ String path = ReadWriteIOUtils.readString(inputStream);
+ mergeChanaged.add(path);
+ }
+ TsFileResource tsFileResource = new TsFileResource(startTimes, endTimes,
overflowChangeType,
+ baseDirIndex, relativePath);
+ tsFileResource.mergeChanged = mergeChanaged;
+ return tsFileResource;
+ }
+
public void setStartTime(String deviceId, long startTime) {
startTimeMap.put(deviceId, startTime);
@@ -155,7 +214,7 @@ public class TsFileResource implements Serializable {
return relativePath;
}
return new File(Directories.getInstance().getTsFileFolder(baseDirIndex),
- relativePath).getPath();
+ relativePath).getPath();
}
public int getBaseDirIndex() {
@@ -225,7 +284,7 @@ public class TsFileResource implements Serializable {
Map<String, Long> startTimeMapCopy = new HashMap<>(this.startTimeMap);
Map<String, Long> endTimeMapCopy = new HashMap<>(this.endTimeMap);
return new TsFileResource(startTimeMapCopy, endTimeMapCopy,
overflowChangeType,
- baseDirIndex, relativePath);
+ baseDirIndex, relativePath);
}
@Override
@@ -242,24 +301,28 @@ public class TsFileResource implements Serializable {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
TsFileResource fileNode = (TsFileResource) o;
return baseDirIndex == fileNode.baseDirIndex &&
- overflowChangeType == fileNode.overflowChangeType &&
- Objects.equals(relativePath, fileNode.relativePath) &&
- Objects.equals(startTimeMap, fileNode.startTimeMap) &&
- Objects.equals(endTimeMap, fileNode.endTimeMap) &&
- Objects.equals(mergeChanged, fileNode.mergeChanged);
+ overflowChangeType == fileNode.overflowChangeType &&
+ Objects.equals(relativePath, fileNode.relativePath) &&
+ Objects.equals(startTimeMap, fileNode.startTimeMap) &&
+ Objects.equals(endTimeMap, fileNode.endTimeMap) &&
+ Objects.equals(mergeChanged, fileNode.mergeChanged);
}
@Override
public String toString() {
return String.format(
- "TsFileResource [relativePath=%s,overflowChangeType=%s,
startTimeMap=%s,"
- + " endTimeMap=%s, mergeChanged=%s]",
- relativePath, overflowChangeType, startTimeMap, endTimeMap,
mergeChanged);
+ "TsFileResource [relativePath=%s,overflowChangeType=%s,
startTimeMap=%s,"
+ + " endTimeMap=%s, mergeChanged=%s]",
+ relativePath, overflowChangeType, startTimeMap, endTimeMap,
mergeChanged);
}
public OverflowChangeType getOverflowChangeType() {
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStoreTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStoreTest.java
new file mode 100644
index 0000000..891695d
--- /dev/null
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStoreTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.filenode;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileNodeProcessorStoreTest {
+
+ private boolean isOverflowed;
+ private Map<String, Long> lastUpdateTimeMap;
+ private TsFileResource emptyTsFileResource;
+ private List<TsFileResource> newFileNodes;
+ private int numOfMergeFile;
+ private FileNodeProcessorStatus fileNodeProcessorStatus;
+
+ private FileNodeProcessorStore fileNodeProcessorStore;
+
+ @Before
+ public void setUp() throws Exception {
+ isOverflowed = true;
+ lastUpdateTimeMap = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ lastUpdateTimeMap.put("d" + i, (long) i);
+ }
+ emptyTsFileResource = TsFileResourceTest.constructTsfileResource();
+ newFileNodes = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ newFileNodes.add(TsFileResourceTest.constructTsfileResource());
+ }
+ numOfMergeFile = 5;
+ fileNodeProcessorStatus = FileNodeProcessorStatus.MERGING_WRITE;
+ fileNodeProcessorStore = new FileNodeProcessorStore(isOverflowed,
lastUpdateTimeMap,
+ emptyTsFileResource, newFileNodes, fileNodeProcessorStatus,
numOfMergeFile);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ @Test
+ public void testSerDeialize() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ fileNodeProcessorStore.serialize(outputStream);
+ ByteArrayInputStream inputStream = new
ByteArrayInputStream(outputStream.toByteArray());
+ FileNodeProcessorStore deFileNodeProcessorStore = FileNodeProcessorStore
+ .deSerialize(inputStream);
+
+ assertEquals(fileNodeProcessorStore.getLastUpdateTimeMap(),
+ deFileNodeProcessorStore.getLastUpdateTimeMap());
+ assertEquals(fileNodeProcessorStore.getNumOfMergeFile(),
+ deFileNodeProcessorStore.getNumOfMergeFile());
+ assertEquals(fileNodeProcessorStore.getFileNodeProcessorStatus(),
+ deFileNodeProcessorStore.getFileNodeProcessorStatus());
+
TsFileResourceTest.assertTsfileRecource(fileNodeProcessorStore.getEmptyTsFileResource(),
+ deFileNodeProcessorStore.getEmptyTsFileResource());
+ assertEquals(fileNodeProcessorStore.getNewFileNodes().size(),
+ deFileNodeProcessorStore.getNewFileNodes().size());
+ for (int i = 0; i < fileNodeProcessorStore.getNewFileNodes().size(); i++) {
+
TsFileResourceTest.assertTsfileRecource(fileNodeProcessorStore.getNewFileNodes().get(i),
+ deFileNodeProcessorStore.getNewFileNodes().get(i));
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/SerializeUtilTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/SerializeUtilTest.java
deleted file mode 100644
index c750439..0000000
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/SerializeUtilTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author liukun
- *
- */
-public class SerializeUtilTest {
-
- private String filePath = "serializeUtilTest";
-
- @Before
- public void setUp() throws Exception {
- EnvironmentUtils.closeStatMonitor();
- EnvironmentUtils.cleanDir(filePath);
-
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanDir(filePath);
- }
-
- @Test
- public void testHashSet() {
- Set<String> overflowset = new HashSet<String>();
- overflowset.add("set1");
- overflowset.add("set2");
- overflowset.add("set3");
-
- SerializeUtil<Set<String>> serializeUtil = new SerializeUtil<>();
-
- try {
- serializeUtil.serialize(overflowset, filePath);
- } catch (IOException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- assertEquals(true, new File(filePath).exists());
-
- try {
- Set<String> readSet = serializeUtil.deserialize(filePath).orElse(new
HashSet<String>());
- assertEquals(overflowset, readSet);
- } catch (IOException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- }
-
- @Test
- public void testFileStore() {
- TsFileResource emptyTsFileResource = new
TsFileResource(OverflowChangeType.NO_CHANGE,
- null);
- List<TsFileResource> newFilenodes = new ArrayList<>();
- String deviceId = "d0.s0";
- for (int i = 1; i <= 3; i++) {
- // i * 100, i * 100 + 99
- TsFileResource node = new TsFileResource(OverflowChangeType.NO_CHANGE,
- "bufferfiletest" + i);
- node.setStartTime(deviceId, i * 100);
- node.setEndTime(deviceId, i * 100 + 99);
- newFilenodes.add(node);
- }
- FileNodeProcessorStatus fileNodeProcessorState =
FileNodeProcessorStatus.WAITING;
- Map<String, Long> lastUpdateTimeMap = new HashMap<>();
- lastUpdateTimeMap.put(deviceId, (long) 500);
- FileNodeProcessorStore fileNodeProcessorStore = new
FileNodeProcessorStore(false,
- lastUpdateTimeMap,
- emptyTsFileResource, newFilenodes, fileNodeProcessorState, 0);
-
- SerializeUtil<FileNodeProcessorStore> serializeUtil = new
SerializeUtil<>();
-
- try {
- serializeUtil.serialize(fileNodeProcessorStore, filePath);
- } catch (IOException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- assertEquals(true, new File(filePath).exists());
- try {
- FileNodeProcessorStore fileNodeProcessorStore2 =
serializeUtil.deserialize(filePath)
- .orElse(new FileNodeProcessorStore(false, new HashMap<>(),
- new TsFileResource(OverflowChangeType.NO_CHANGE, null),
- new ArrayList<TsFileResource>(),
- FileNodeProcessorStatus.NONE, 0));
- assertEquals(fileNodeProcessorStore.getLastUpdateTimeMap(),
- fileNodeProcessorStore2.getLastUpdateTimeMap());
- assertEquals(fileNodeProcessorStore.getEmptyTsFileResource(),
- fileNodeProcessorStore2.getEmptyTsFileResource());
- assertEquals(fileNodeProcessorStore.getNewFileNodes(),
- fileNodeProcessorStore2.getNewFileNodes());
- assertEquals(fileNodeProcessorStore.getNumOfMergeFile(),
- fileNodeProcessorStore2.getNumOfMergeFile());
- assertEquals(fileNodeProcessorStore.getFileNodeProcessorStatus(),
- fileNodeProcessorStore2.getFileNodeProcessorStatus());
- } catch (IOException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
new file mode 100644
index 0000000..b66deea
--- /dev/null
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.filenode;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TsFileResourceTest {
+
+
+ private TsFileResource tsFileResource;
+
+ public static TsFileResource constructTsfileResource() {
+ TsFileResource tsFileResource;
+ String relativePath = "relativePath";
+ Map<String, Long> startTimes = new HashMap<>();
+ Map<String, Long> endTimes = new HashMap<>();
+
+ tsFileResource = new TsFileResource(OverflowChangeType.MERGING_CHANGE,
+ relativePath);
+ for (int i = 0; i < 10; i++) {
+ startTimes.put("d" + i, (long) i);
+ }
+ for (int i = 0; i < 10; i++) {
+ endTimes.put("d" + i, (long) (i + 10));
+ }
+ tsFileResource.setStartTimeMap(startTimes);
+ tsFileResource.setEndTimeMap(endTimes);
+ for (int i = 0; i < 5; i++) {
+ tsFileResource.addMergeChanged("d" + i);
+ }
+ return tsFileResource;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.tsFileResource = constructTsfileResource();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ @Test
+ public void testSerDeialize() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(0);
+ tsFileResource.serialize(outputStream);
+ ByteArrayInputStream inputStream = new
ByteArrayInputStream(outputStream.toByteArray());
+ TsFileResource deTsfileResource = TsFileResource.deSerialize(inputStream);
+ assertTsfileRecource(tsFileResource, deTsfileResource);
+ }
+ @Test
+ public void testSerdeializeCornerCase() throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(0);
+ tsFileResource.setRelativePath(null);
+ tsFileResource.serialize(outputStream);
+ ByteArrayInputStream inputStream = new
ByteArrayInputStream(outputStream.toByteArray());
+ TsFileResource deTsfileResource = TsFileResource.deSerialize(inputStream);
+ assertTsfileRecource(tsFileResource,deTsfileResource);
+ }
+
+ public static void assertTsfileRecource(TsFileResource tsFileResource,
+ TsFileResource deTsfileResource) {
+ assertEquals(tsFileResource.getBaseDirIndex(),
deTsfileResource.getBaseDirIndex());
+ assertEquals(tsFileResource.getRelativePath(),
deTsfileResource.getRelativePath());
+ assertEquals(tsFileResource.getOverflowChangeType(),
deTsfileResource.getOverflowChangeType());
+ assertEquals(tsFileResource.getStartTimeMap(),
deTsfileResource.getStartTimeMap());
+ assertEquals(tsFileResource.getEndTimeMap(),
deTsfileResource.getEndTimeMap());
+ assertEquals(tsFileResource.getMergeChanged(),
deTsfileResource.getMergeChanged());
+ }
+}
\ No newline at end of file