This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch deletion_expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/deletion_expr by this push:
new 7e5bcc9281d add ModificationFile
7e5bcc9281d is described below
commit 7e5bcc9281d5fca9d04e2ac292df229fafc58228
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Sep 19 15:29:22 2024 +0800
add ModificationFile
---
.../org/apache/iotdb/db/expr/DeletionExprMain.java | 4 +-
.../dataregion/modification/ModEntry.java | 35 +++++
.../dataregion/modification/ModificationFile.java | 163 +++++++++++++++++++++
3 files changed, 200 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java
index b835456f871..a4dfcfc2628 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java
@@ -282,8 +282,8 @@ public class DeletionExprMain {
// 24 * 60 * 60 * 1000 * 1000L,
// 2 * 24 * 60 * 60 * 1000 * 1000L,
// 3 * 24 * 60 * 60 * 1000 * 1000L,
- 4 * 24 * 60 * 60 * 1000 * 1000L,
-// 5 * 24 * 60 * 60 * 1000 * 1000L
+// 4 * 24 * 60 * 60 * 1000 * 1000L,
+ 5 * 24 * 60 * 60 * 1000 * 1000L
};
Configurer configurer = (expr, j) -> {
expr.maxTimestamp = exprArgs[j];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java
index 5c700d09975..e3c1fd6aa47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java
@@ -43,7 +43,15 @@ public abstract class ModEntry implements StreamSerializable
{
@Override
public void deserialize(InputStream stream) throws IOException {
+ this.timeRange = new TimeRange(ReadWriteIOUtils.readLong(stream),
+ ReadWriteIOUtils.readLong(stream));
+ }
+ public static ModEntry createFrom(InputStream stream) throws IOException {
+ ModType modType = ModType.deserialize(stream);
+ ModEntry entry = modType.newEntry();
+ entry.deserialize(stream);
+ return entry;
}
public enum ModType {
@@ -60,6 +68,21 @@ public abstract class ModEntry implements StreamSerializable
{
return typeNum;
}
+ public ModEntry newEntry() {
+ ModEntry entry;
+ switch (this) {
+ case TREE_DELETION:
+ entry = new TreeDeletionEntry();
+ break;
+ case TABLE_DELETION:
+ entry = new TableDeletionEntry();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported mod type: " + this);
+ }
+ return entry;
+ }
+
public static ModType deserialize(ByteBuffer buffer) {
byte typeNum = buffer.get();
switch (typeNum) {
@@ -71,5 +94,17 @@ public abstract class ModEntry implements StreamSerializable
{
throw new IllegalArgumentException("Unknown ModType: " + typeNum);
}
}
+
+ public static ModType deserialize(InputStream stream) throws IOException {
+ byte typeNum = ReadWriteIOUtils.readByte(stream);
+ switch (typeNum) {
+ case 0x00:
+ return TABLE_DELETION;
+ case 0x01:
+ return TREE_DELETION;
+ default:
+ throw new IllegalArgumentException("Unknown ModType: " + typeNum);
+ }
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
new file mode 100644
index 00000000000..ba941305cec
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.modification;
+
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ModificationFile implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ModificationFile.class);
+
+ private final File file;
+ private FileChannel channel;
+ private OutputStream fileOutputStream;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Set<TsFileResource> tsFileRefs = new
ConcurrentSkipListSet<>(Comparator.comparing(
+ TsFileResource::getTsFilePath));
+
+ public ModificationFile(String filePath, TsFileResource firstResource)
throws IOException {
+ this.file = new File(filePath);
+ tsFileRefs.add(firstResource);
+ }
+
+ public void write(ModEntry entry) throws IOException {
+ if (fileOutputStream == null) {
+ fileOutputStream = new
BufferedOutputStream(Files.newOutputStream(file.toPath()));
+ channel = FileChannel.open(file.toPath());
+ }
+ entry.serialize(fileOutputStream);
+ channel.force(false);
+ }
+
+ public Iterator<ModEntry> getModIterator() throws IOException {
+ return new ModIterator();
+ }
+
+ @Override
+ public void close() throws Exception {
+ fileOutputStream.close();
+ fileOutputStream = null;
+ channel.close();
+ channel = null;
+ }
+
+ /**
+ * Add a TsFile to the reference set only if the set is not empty.
+ * @param tsFile TsFile to be added
+ * @return true if the TsFile is successfully added, false if the reference
set is empty.
+ */
+ public boolean addReference(TsFileResource tsFile) {
+ // adding references can be concurrent, but adding and removing cannot
+ lock.readLock().lock();
+ try {
+ if (!tsFileRefs.isEmpty()) {
+ tsFileRefs.add(tsFile);
+ return true;
+ }
+ return false;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Remove the references of the given TsFiles.
+ * @param tsFiles references to remove
+ * @return true if the ref set is empty after removal, false otherwise
+ */
+ public boolean removeReferences(List<TsFileResource> tsFiles) {
+ lock.writeLock().lock();
+ try {
+ tsFiles.forEach(tsFileRefs::remove);
+ return tsFileRefs.isEmpty();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public class ModIterator implements Iterator<ModEntry>, AutoCloseable {
+ private InputStream inputStream;
+ private ModEntry nextEntry;
+
+ public ModIterator() throws IOException {
+ this.inputStream = Files.newInputStream(file.toPath());
+ }
+
+ @Override
+ public void close() {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ LOGGER.info("Cannot close mod file input stream of {}", file, e);
+ } finally {
+ inputStream = null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (inputStream == null) {
+ return false;
+ }
+ if (nextEntry == null) {
+ try {
+ if (inputStream.available() == 0) {
+ return false;
+ }
+ nextEntry = ModEntry.createFrom(inputStream);
+ } catch (EOFException e) {
+ close();
+ } catch (IOException e) {
+ LOGGER.info("Cannot read mod file input stream of {}", file, e);
+ close();
+ }
+ }
+
+ return nextEntry != null;
+ }
+
+ @Override
+ public ModEntry next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ ModEntry ret = nextEntry;
+ nextEntry = null;
+ return ret;
+ }
+ }
+}