This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0921520e1c40f3d0b9946e22dcc1765475352463 Author: HTHou <[email protected]> AuthorDate: Tue Jul 8 16:26:42 2025 +0800 refactor write --- .../main/java/org/apache/iotdb/ObjectExample.java | 3 +- .../dataregion/DataExecutionVisitor.java | 13 ++++ .../execution/executor/RegionWriteExecutor.java | 14 ++++ .../plan/planner/plan/node/PlanNodeType.java | 6 +- .../plan/planner/plan/node/PlanVisitor.java | 5 ++ .../planner/plan/node/write/InsertTabletNode.java | 8 +-- .../node/write/{FileNode.java => ObjectNode.java} | 80 ++++++++++++++-------- .../node/write/RelationalInsertTabletNode.java | 71 ++++++++++++------- .../db/storageengine/dataregion/DataRegion.java | 24 +++++++ .../dataregion/memtable/AbstractMemTable.java | 2 +- .../dataregion/memtable/TsFileProcessor.java | 8 +-- .../tsfile/generator/TsFileNameGenerator.java | 22 +++--- .../dataregion/wal/buffer/WALBuffer.java | 4 +- .../dataregion/wal/buffer/WALEntry.java | 6 +- .../dataregion/wal/buffer/WALInfoEntry.java | 4 +- .../dataregion/wal/node/IWALNode.java | 4 +- .../dataregion/wal/node/WALFakeNode.java | 4 +- .../storageengine/dataregion/wal/node/WALNode.java | 8 +-- 18 files changed, 186 insertions(+), 100 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java index e4007f6f3b9..36cab4c8d0d 100644 --- a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java +++ b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java @@ -99,8 +99,7 @@ public class ObjectExample { true, 0, Files.readAllBytes( - Paths.get( - "/Users/jackietien/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664"))); + Paths.get("/Users/ht/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664"))); session.insert(tablet); tablet.reset(); // diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index 7431aa9a79d..6a6568611d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; @@ -292,4 +293,16 @@ public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> { node.getDeleteDataNode().markAsGeneratedByPipe(); return node.getDeleteDataNode().accept(this, context); } + + @Override + public TSStatus visitWriteObjectFile(ObjectNode node, DataRegion dataRegion) { + try { + dataRegion.writeObject(node); + dataRegion.insertSeparatorToWAL(); + return StatusUtils.OK; + } catch (final Exception e) { + LOGGER.error("Error in executing plan node: {}", node, e); + return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode()); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java index 60dda197578..e3c2d3b4842 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java @@ -66,6 +66,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode; @@ -370,6 +371,19 @@ public class RegionWriteExecutor { } } + @Override + public RegionExecutionResult visitWriteObjectFile( + final ObjectNode node, final WritePlanNodeExecutionContext context) { + // data deletion don't need to block data insertion, but there are some creation operation + // require write lock on data region. + context.getRegionWriteValidationRWLock().writeLock().lock(); + try { + return super.visitWriteObjectFile(node, context); + } finally { + context.getRegionWriteValidationRWLock().writeLock().unlock(); + } + } + @Override public RegionExecutionResult visitDeleteTimeseries( final DeleteTimeSeriesNode node, final WritePlanNodeExecutionContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index a90372bcd9c..421a5396807 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -106,12 +106,12 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueries import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; @@ -358,7 +358,7 @@ public enum PlanNodeType { case 2003: return RelationalDeleteDataNode.deserializeFromWAL(stream); case 2004: - return FileNode.deserializeFromWAL(stream); + return ObjectNode.deserializeFromWAL(stream); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } @@ -388,7 +388,7 @@ public enum PlanNodeType { case 2004: // TODO haonan only be called by follower, should call normal dserialize instead from // deserializeFromWAL - return FileNode.deserializeFromWAL(buffer); + return ObjectNode.deserializeFromWAL(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 4ff7f9f62cb..609aacf8662 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -115,6 +115,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; @@ -620,6 +621,10 @@ public abstract class PlanVisitor<R, C> { return visitPlan(node, context); } + public R visitWriteObjectFile(ObjectNode node, C context) { + return visitPlan(node, context); + } + ///////////////////////////////////////////////////////////////////////////////////////////////// // Pipe Related Node ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 5eebc85f56f..5b495e6c9e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -167,10 +167,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { return columns; } - public Object[] getColumnsAndConvertObjects() { - return columns; - } - public void setColumns(Object[] columns) { this.columns = columns; } @@ -299,7 +295,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { return splitMap; } - private List<WritePlanNode> doSplit(Map<TRegionReplicaSet, List<Integer>> splitMap) { + protected List<WritePlanNode> doSplit(Map<TRegionReplicaSet, List<Integer>> splitMap) { List<WritePlanNode> result = new ArrayList<>(); if (splitMap.size() == 1) { @@ -336,7 +332,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { subTimes.length); } - private WritePlanNode generateOneSplit(Map.Entry<TRegionReplicaSet, List<Integer>> entry) { + protected WritePlanNode generateOneSplit(Map.Entry<TRegionReplicaSet, List<Integer>> entry) { List<Integer> locs; // generate a new times and values locs = entry.getValue(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java similarity index 72% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java index 5ea2c64f2c4..6b989ca564a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; @@ -44,7 +45,7 @@ import java.util.List; import java.util.Optional; // TODO:[OBJECT] WAL serde -public class FileNode extends SearchNode implements WALEntryValue { +public class ObjectNode extends SearchNode implements WALEntryValue { private final boolean isEOF; @@ -54,18 +55,25 @@ public class FileNode extends SearchNode implements WALEntryValue { private String filePath; - public FileNode(boolean isEOF, long offset, byte[] content) { + private int contentLength; + + private TRegionReplicaSet dataRegionReplicaSet; + + public ObjectNode(boolean isEOF, long offset, byte[] content, String filePath) { super(new PlanNodeId("")); this.isEOF = isEOF; this.offset = offset; + this.filePath = filePath; this.content = content; + this.contentLength = content.length; } - public FileNode(boolean isEOF, long offset, byte[] content, String filePath) { + public ObjectNode(boolean isEOF, long offset, int contentLength, String filePath) { super(new PlanNodeId("")); this.isEOF = isEOF; this.offset = offset; this.filePath = filePath; + this.contentLength = contentLength; } public boolean isEOF() { @@ -95,8 +103,8 @@ public class FileNode extends SearchNode implements WALEntryValue { buffer.putLong(searchIndex); buffer.put((byte) (isEOF ? 1 : 0)); buffer.putLong(offset); - buffer.putInt(content.length); WALWriteUtils.write(filePath, buffer); + buffer.putInt(content.length); } @Override @@ -109,38 +117,28 @@ public class FileNode extends SearchNode implements WALEntryValue { + ReadWriteIOUtils.sizeToWrite(filePath); } - public static FileNode deserializeFromWAL(DataInputStream stream) throws IOException { + public static ObjectNode deserializeFromWAL(DataInputStream stream) throws IOException { // TODO haonan only be called in recovery, should only deserialize relativePath, offset, eof, // length long searchIndex = stream.readLong(); boolean isEOF = stream.readByte() == 1; long offset = stream.readLong(); - int contentLength = stream.readInt(); String filePath = ReadWriteIOUtils.readString(stream); - Optional<File> objectFile = TierManager.getInstance().getAbsoluteObjectFilePath(filePath); - byte[] contents = new byte[contentLength]; - if (objectFile.isPresent()) { - try (RandomAccessFile raf = new RandomAccessFile(filePath, "r")) { - raf.seek(offset); - raf.read(contents); - } - } else { - throw new ObjectFileNotExist(filePath); - } + int contentLength = stream.readInt(); - FileNode fileNode = new FileNode(isEOF, offset, contents, filePath); - fileNode.setSearchIndex(searchIndex); + ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength, filePath); + objectNode.setSearchIndex(searchIndex); - return fileNode; + return objectNode; } - public static FileNode deserializeFromWAL(ByteBuffer buffer) { + public static ObjectNode deserializeFromWAL(ByteBuffer buffer) { long searchIndex = buffer.getLong(); boolean isEOF = buffer.get() == 1; long offset = buffer.getLong(); - int contentLength = buffer.getInt(); String filePath = ReadWriteIOUtils.readString(buffer); Optional<File> objectFile = TierManager.getInstance().getAbsoluteObjectFilePath(filePath); + int contentLength = buffer.getInt(); byte[] contents = new byte[contentLength]; if (objectFile.isPresent()) { try (RandomAccessFile raf = new RandomAccessFile(filePath, "r")) { @@ -153,14 +151,17 @@ public class FileNode extends SearchNode implements WALEntryValue { throw new ObjectFileNotExist(filePath); } - FileNode fileNode = new FileNode(isEOF, offset, contents, filePath); - fileNode.setSearchIndex(searchIndex); - return fileNode; + ObjectNode objectNode = new ObjectNode(isEOF, offset, contents, filePath); + objectNode.setSearchIndex(searchIndex); + return objectNode; } @Override public SearchNode merge(List<SearchNode> searchNodes) { - return null; + if (searchNodes.size() == 1) { + return searchNodes.get(0); + } + throw new UnsupportedOperationException("Merge is not supported"); } @Override @@ -178,7 +179,11 @@ public class FileNode extends SearchNode implements WALEntryValue { @Override public TRegionReplicaSet getRegionReplicaSet() { - return null; + return dataRegionReplicaSet; + } + + public void setDataRegionReplicaSet(TRegionReplicaSet dataRegionReplicaSet) { + this.dataRegionReplicaSet = dataRegionReplicaSet; } @Override @@ -205,10 +210,24 @@ public class FileNode extends SearchNode implements WALEntryValue { } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) {} + protected void serializeAttributes(ByteBuffer byteBuffer) { + getType().serialize(byteBuffer); + ReadWriteIOUtils.write(isEOF, byteBuffer); + ReadWriteIOUtils.write(offset, byteBuffer); + ReadWriteIOUtils.write(filePath, byteBuffer); + ReadWriteIOUtils.write(contentLength, byteBuffer); + byteBuffer.put(content); + } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException {} + protected void serializeAttributes(DataOutputStream stream) throws IOException { + getType().serialize(stream); + ReadWriteIOUtils.write(isEOF, stream); + ReadWriteIOUtils.write(offset, stream); + ReadWriteIOUtils.write(filePath, stream); + ReadWriteIOUtils.write(contentLength, stream); + stream.write(content); + } @Override public PlanNodeType getType() { @@ -219,4 +238,9 @@ public class FileNode extends SearchNode implements WALEntryValue { public long getMemorySize() { return content.length; } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitWriteObjectFile(this, context); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index a7199f0b310..686715cd8c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -30,7 +30,9 @@ import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.tsfile.enums.TSDataType; @@ -39,6 +41,7 @@ import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -47,10 +50,12 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; public class RelationalInsertTabletNode extends InsertTabletNode { @@ -386,35 +391,51 @@ public class RelationalInsertTabletNode extends InsertTabletNode { } } - public boolean hasObjectValue() { - for (int i = 0; i < columns.length; i++) { - if (dataTypes[i] == TSDataType.OBJECT) { - return true; - } - } - return false; - } - - public void setConvertedObjectValue(Binary objectValue, int column, int row) { - if (convertedColumns == null) { - convertedColumns = new Object[columns.length]; - for (int i = 0; i < columns.length; i++) { - if (dataTypes[i] == TSDataType.OBJECT) { - convertedColumns[i] = new Binary[((Binary[]) columns[i]).length]; - } else { - convertedColumns[i] = columns[i]; + @Override + protected List<WritePlanNode> doSplit(Map<TRegionReplicaSet, List<Integer>> splitMap) { + List<WritePlanNode> result = new ArrayList<>(); + + if (splitMap.size() == 1) { + final Entry<TRegionReplicaSet, List<Integer>> entry = splitMap.entrySet().iterator().next(); + if (entry.getValue().size() == 2) { + // Avoid using system arraycopy when there is no need to split + setRange(entry.getValue()); + setDataRegionReplicaSet(entry.getKey()); + for (int i = 0; i < columns.length; i++) { + if (dataTypes[i] == TSDataType.OBJECT) { + for (int j = 0; j < times.length; j++) { + byte[] binary = ((Binary[]) columns[i])[j].getValues(); + ByteBuffer buffer = ByteBuffer.wrap(binary); + boolean isEoF = buffer.get() == 1; + long offset = buffer.getLong(); + byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); + String relativePath = + TsFileNameGenerator.generateObjectFilePath(times[j], getDeviceID(j)); + ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); + objectNode.setDataRegionReplicaSet(entry.getKey()); + result.add(objectNode); + if (isEoF) { + byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); + byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; + System.arraycopy( + BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, Long.BYTES); + System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); + ((Binary[]) columns[i])[j] = new Binary(valueBytes); + } else { + ((Binary[]) columns[i])[j] = null; + } + } + } } + result.add(this); + return result; } } - ((Binary[]) convertedColumns[column])[row] = objectValue; - } - @Override - public Object[] getColumnsAndConvertObjects() { - if (!hasObjectValue()) { - return columns; - } else { - return convertedColumns; + for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) { + // TODO: add ObjectNode for split + result.add(generateOneSplit(entry)); } + return result; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index db5fbe98729..f80a853ad6d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -76,6 +76,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy; @@ -150,6 +151,7 @@ import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.ObjectWriter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -174,6 +176,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -3074,6 +3077,27 @@ public class DataRegion implements IDataRegionForQuery { } } + public void writeObject(ObjectNode objectNode) throws Exception { + writeLock("writeObject"); + try { + String relativeTmpPathString = objectNode.getFilePath() + ".tmp"; + String objectFileDir = TierManager.getInstance().getNextFolderForObjectFile(); + File objectTmpFile = + FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeTmpPathString); + try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) { + writer.write(objectNode.isEOF(), objectNode.getOffset(), objectNode.getContent()); + } + if (objectNode.isEOF()) { + File objectFile = + FSFactoryProducer.getFSFactory().getFile(objectFileDir, objectNode.getFilePath()); + Files.move( + objectTmpFile.toPath(), objectFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } finally { + writeUnlock(); + } + } + /** * Load a new tsfile to unsequence dir. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index fea43e65c80..d550781858f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -356,7 +356,7 @@ public abstract class AbstractMemTable implements IMemTable { createAlignedMemChunkGroupIfNotExistAndGet(deviceID, schemaList); memChunkGroup.writeTablet( insertTabletNode.getTimes(), - insertTabletNode.getColumnsAndConvertObjects(), + insertTabletNode.getColumns(), insertTabletNode.getBitMaps(), schemaList, splitStart, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 4a270779fe8..878eda4d071 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -570,7 +570,6 @@ public class TsFileProcessor { throws WriteProcessException { ensureMemTable(infoForMetrics); - handleWriteObject(insertTabletNode, rangeList, results); long[] memIncrements = scheduleMemoryBlock(insertTabletNode, rangeList, results, noFailure, infoForMetrics); @@ -1022,9 +1021,7 @@ public class TsFileProcessor { // TEXT data size if (dataType.isBinary()) { Binary[] binColumn = (Binary[]) column; - if (dataType != TSDataType.OBJECT) { - memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end, null); - } + memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end, null); } } @@ -2391,9 +2388,6 @@ public class TsFileProcessor { System.arraycopy( BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 0, Long.BYTES); System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); - node.setConvertedObjectValue(new Binary(valueBytes), j, i); - } else { - node.setConvertedObjectValue(null, j, i); } // WALFlushListener walFlushListener; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java index aa5f4beafbf..4208d7f4092 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java @@ -21,15 +21,14 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.generator; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.fsFactory.FSFactory; import org.slf4j.Logger; @@ -169,18 +168,15 @@ public class TsFileNameGenerator { } } - public static String generateObjectFilePath( - long time, int datanodeId, String databaseName, int dataRegionId) { + public static String generateObjectFilePath(long time, IDeviceID iDeviceID) { String objectFileName = time + ".bin"; - String relativePathString = - databaseName - + File.separator - + dataRegionId - + File.separator - + TimePartitionUtils.getTimePartitionId(time) - + File.separator - + objectFileName; - return relativePathString; + Object[] segments = iDeviceID.getSegments(); + StringBuilder relativePathString = new StringBuilder(); + for (Object segment : segments) { + relativePathString.append(segment.toString().toLowerCase()).append(File.separator); + } + relativePathString.append(objectFileName); + return relativePathString.toString(); } @TestOnly diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java index f0a95a97705..951df043750 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java @@ -26,8 +26,8 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint; @@ -334,7 +334,7 @@ public class WALBuffer extends AbstractWALBuffer { } else if (walEntry.getType() == WALEntryType.RELATIONAL_DELETE_DATA_NODE) { searchIndex = ((RelationalDeleteDataNode) walEntry.getValue()).getSearchIndex(); } else if (walEntry.getType() == WALEntryType.OBJECT_FILE_NODE) { - searchIndex = ((FileNode) walEntry.getValue()).getSearchIndex(); + searchIndex = ((ObjectNode) walEntry.getValue()).getSearchIndex(); } else { searchIndex = ((InsertNode) walEntry.getValue()).getSearchIndex(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java index af00ab29781..f91e6b64265 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java @@ -23,10 +23,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; @@ -78,7 +78,7 @@ public abstract class WALEntry implements SerializedSize { this.type = WALEntryType.CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE; } else if (value instanceof RelationalDeleteDataNode) { this.type = WALEntryType.RELATIONAL_DELETE_DATA_NODE; - } else if (value instanceof FileNode) { + } else if (value instanceof ObjectNode) { this.type = WALEntryType.OBJECT_FILE_NODE; } else { throw new RuntimeException("Unknown WALEntry type"); @@ -138,7 +138,7 @@ public abstract class WALEntry implements SerializedSize { value = (ContinuousSameSearchIndexSeparatorNode) PlanNodeType.deserializeFromWAL(stream); break; case OBJECT_FILE_NODE: - value = (FileNode) PlanNodeType.deserializeFromWAL(stream); + value = (ObjectNode) PlanNodeType.deserializeFromWAL(stream); break; default: throw new RuntimeException("Unknown WALEntry type " + type); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java index 57157a0092b..791ef63a752 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java @@ -21,9 +21,9 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.buffer; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; @@ -169,7 +169,7 @@ public class WALInfoEntry extends WALEntry { case MEMORY_TABLE_CHECKPOINT: return RamUsageEstimator.sizeOfObject(value); case OBJECT_FILE_NODE: - return ((FileNode) value).serializedSize(); + return ((ObjectNode) value).serializedSize(); default: throw new RuntimeException("Unsupported wal entry type " + type); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java index aa8959e5abf..a8fbbee0dc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java @@ -23,10 +23,10 @@ import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.iot.log.ConsensusReqReader; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; @@ -55,7 +55,7 @@ public interface IWALNode extends FlushListener, AutoCloseable, ConsensusReqRead /** Log BatchDoneNode */ WALFlushListener log(long memTableId, ContinuousSameSearchIndexSeparatorNode separatorNode); - WALFlushListener log(long memTableId, FileNode fileNode); + WALFlushListener log(long memTableId, ObjectNode objectNode); /** Callback when memTable created. */ void onMemTableCreated(IMemTable memTable, String targetTsFile); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java index 5a2cbbc12d2..7cc9a60b4d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java @@ -21,10 +21,10 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException; @@ -84,7 +84,7 @@ public class WALFakeNode implements IWALNode { } @Override - public WALFlushListener log(long memTableId, FileNode fileNode) { + public WALFlushListener log(long memTableId, ObjectNode objectNode) { return getResult(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index d73075cdf29..c2af1493115 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -30,10 +30,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -196,8 +196,8 @@ public class WALNode implements IWALNode { } @Override - public WALFlushListener log(long memTableId, FileNode fileNode) { - WALEntry walEntry = new WALInfoEntry(memTableId, fileNode); + public WALFlushListener log(long memTableId, ObjectNode objectNode) { + WALEntry walEntry = new WALInfoEntry(memTableId, objectNode); return log(walEntry); } @@ -786,7 +786,7 @@ public class WALNode implements IWALNode { // need to add WALEntryType + memtableId + relativePath, offset, eof, length + // filecontent // need to add IoTConsensusRequest instead of FileNode - tmpNodes.get().add((FileNode) walEntry.getValue()); + tmpNodes.get().add((ObjectNode) walEntry.getValue()); } else { tmpNodes.get().add(new IoTConsensusRequest(buffer)); }
