This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 60dc05ea16b237d832dc56a16061d2c3b57c6eb5 Author: HTHou <[email protected]> AuthorDate: Tue Jul 8 15:04:25 2025 +0800 dev --- .../main/java/org/apache/iotdb/ObjectExample.java | 67 ++++++++-------- .../dataregion/DataExecutionVisitor.java | 1 - .../planner/plan/node/write/InsertTabletNode.java | 4 + .../node/write/RelationalInsertTabletNode.java | 55 +++++++------ .../iotdb/db/storageengine/StorageEngine.java | 3 + .../db/storageengine/dataregion/DataRegion.java | 30 +++----- .../dataregion/memtable/AbstractMemTable.java | 2 +- .../dataregion/memtable/TsFileProcessor.java | 90 +++++++++------------- .../tsfile/generator/TsFileNameGenerator.java | 16 ++++ .../org/apache/iotdb/db/utils/ObjectWriter.java | 14 ++-- 10 files changed, 137 insertions(+), 145 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java index 3339d3aae5a..e4007f6f3b9 100644 --- a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java +++ b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java @@ -82,8 +82,7 @@ public class ObjectExample { true, 0, Files.readAllBytes( - Paths.get( - "/Users/jackietien/Downloads/2_1746622362350_fa24aa15233f4e76bcda789a5771f43f"))); + Paths.get("/Users/ht/Downloads/2_1746622362350_fa24aa15233f4e76bcda789a5771f43f"))); session.insert(tablet); tablet.reset(); @@ -104,38 +103,38 @@ public class ObjectExample { "/Users/jackietien/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664"))); session.insert(tablet); tablet.reset(); - - tablet = new Tablet("tiff_table", columnNameList, dataTypeList, columnTypeList, 1); - rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 1); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "3"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue( - rowIndex, - 4, - true, - 0, - Files.readAllBytes(Paths.get("/Users/jackietien/Downloads/1751891240130.tiff"))); - session.insert(tablet); - tablet.reset(); - - tablet = new Tablet("tiff_table", columnNameList, dataTypeList, columnTypeList, 1); - rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, 2); - tablet.addValue(rowIndex, 0, "1"); - tablet.addValue(rowIndex, 1, "5"); - tablet.addValue(rowIndex, 2, "4"); - tablet.addValue(rowIndex, 3, 37.6F); - tablet.addValue( - rowIndex, - 4, - true, - 0, - Files.readAllBytes(Paths.get("/Users/jackietien/Downloads/1751891242743.tiff"))); - session.insert(tablet); - tablet.reset(); + // + // tablet = new Tablet("tiff_table", columnNameList, dataTypeList, columnTypeList, 1); + // rowIndex = tablet.getRowSize(); + // tablet.addTimestamp(rowIndex, 1); + // tablet.addValue(rowIndex, 0, "1"); + // tablet.addValue(rowIndex, 1, "5"); + // tablet.addValue(rowIndex, 2, "3"); + // tablet.addValue(rowIndex, 3, 37.6F); + // tablet.addValue( + // rowIndex, + // 4, + // true, + // 0, + // Files.readAllBytes(Paths.get("/Users/jackietien/Downloads/1751891240130.tiff"))); + // session.insert(tablet); + // tablet.reset(); + // + // tablet = new Tablet("tiff_table", columnNameList, dataTypeList, columnTypeList, 1); + // rowIndex = tablet.getRowSize(); + // tablet.addTimestamp(rowIndex, 2); + // tablet.addValue(rowIndex, 0, "1"); + // tablet.addValue(rowIndex, 1, "5"); + // tablet.addValue(rowIndex, 2, "4"); + // tablet.addValue(rowIndex, 3, 37.6F); + // tablet.addValue( + // rowIndex, + // 4, + // true, + // 0, + // Files.readAllBytes(Paths.get("/Users/jackietien/Downloads/1751891242743.tiff"))); + // session.insert(tablet); + // tablet.reset(); } catch (IoTDBConnectionException e) { e.printStackTrace(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index 34f051be137..7431aa9a79d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -91,7 +91,6 @@ public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> { @Override public TSStatus visitRelationalInsertTablet( RelationalInsertTabletNode node, DataRegion dataRegion) { - node.handleObjectTypeValue(); return visitInsertTablet(node, dataRegion); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index a1f909b2b8e..18d79671c79 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -167,6 +167,10 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { return columns; } + public Object[] getColumnsAndConvertObjects() { + return columns; + } + public void setColumns(Object[] columns) { this.columns = columns; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 9f50b13b25c..a7199f0b310 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -39,7 +39,6 @@ import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -60,7 +59,7 @@ public class RelationalInsertTabletNode extends InsertTabletNode { private boolean singleDevice; - private List<List<FileNode>> fileNodesList; + private Object[] convertedColumns; public RelationalInsertTabletNode( PlanNodeId id, @@ -112,14 +111,6 @@ public class RelationalInsertTabletNode extends InsertTabletNode { this.singleDevice = true; } - public void setFileNodeList(List<List<FileNode>> fileNodesList) { - this.fileNodesList = fileNodesList; - } - - public List<List<FileNode>> getFileNodeList() { - return fileNodesList; - } - public List<Binary[]> getObjectColumns() { List<Binary[]> objectColumns = new ArrayList<>(); for (int i = 0; i < columns.length; i++) { @@ -395,29 +386,35 @@ public class RelationalInsertTabletNode extends InsertTabletNode { } } - public void handleObjectTypeValue() { - List<List<FileNode>> fileNodesList = new ArrayList<>(); - for (int i = 0; i < dataTypes.length; i++) { + public boolean hasObjectValue() { + for (int i = 0; i < columns.length; i++) { if (dataTypes[i] == TSDataType.OBJECT) { - List<FileNode> fileNodes = new ArrayList<>(); - for (int j = 0; j < times.length; j++) { - Binary value = ((Binary[]) columns[i])[j]; - boolean isEoF = value.getValues()[0] == 1; - byte[] offsetBytes = new byte[8]; - System.arraycopy(value.getValues(), 1, offsetBytes, 0, 8); - long offset = BytesUtils.bytesToLong(offsetBytes); - byte[] content = new byte[value.getLength() - 9]; - System.arraycopy(value.getValues(), 9, content, 0, value.getLength() - 9); - FileNode fileNode = new FileNode(isEoF, offset, content); - fileNode.setSearchIndex(this.getSearchIndex()); - fileNodes.add(fileNode); - ((Binary[]) columns[i])[j] = null; + return true; + } + } + return false; + } + + public void setConvertedObjectValue(Binary objectValue, int column, int row) { + if (convertedColumns == null) { + convertedColumns = new Object[columns.length]; + for (int i = 0; i < columns.length; i++) { + if (dataTypes[i] == TSDataType.OBJECT) { + convertedColumns[i] = new Binary[((Binary[]) columns[i]).length]; + } else { + convertedColumns[i] = columns[i]; } - fileNodesList.add(fileNodes); } } - if (!fileNodesList.isEmpty()) { - this.fileNodesList = fileNodesList; + ((Binary[]) convertedColumns[column])[row] = objectValue; + } + + @Override + public Object[] getColumnsAndConvertObjects() { + if (!hasObjectValue()) { + return columns; + } else { + return convertedColumns; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 2c2281dcf66..7edd40e42d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -105,6 +105,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -155,6 +156,8 @@ public class StorageEngine implements IService { private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager(); + public final AtomicLong objectFileId = new AtomicLong(0); + private StorageEngine() {} public static StorageEngine getInstance() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 25bc1484a78..db5fbe98729 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -347,8 +347,6 @@ public class DataRegion implements IDataRegionForQuery { private ILoadDiskSelector ordinaryLoadDiskSelector; private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector; - public AtomicLong objectFileId = new AtomicLong(0); - /** * Construct a database processor. * @@ -821,14 +819,6 @@ public class DataRegion implements IDataRegionForQuery { File[] objectFileInThisFolder = fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), ".bin"); - for (File f : objectFileInThisFolder) { - objectFileId.updateAndGet( - current -> - Math.max( - current, - Long.parseLong( - f.getName().substring(0, f.getName().length() - 4).split("-")[2]))); - } File[] objectTmpFileInThisFolder = fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), ".bin.tmp"); for (File f : objectTmpFileInThisFolder) { @@ -868,14 +858,18 @@ public class DataRegion implements IDataRegionForQuery { } else { File[] objectFileInThisFolder = fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), ".bin"); - for (File f : objectFileInThisFolder) { - objectFileId.updateAndGet( - current -> - Math.max( - current, - Long.parseLong( - f.getName().substring(0, f.getName().length() - 4).split("-")[2]))); - } + // for (File f : objectFileInThisFolder) { + // StorageEngine.getInstance() + // .objectFileId + // .updateAndGet( + // current -> + // Math.max( + // current, + // Long.parseLong( + // f.getName() + // .substring(0, f.getName().length() - 4) + // .split("-")[2]))); + // } File[] objectTmpFileInThisFolder = fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), ".bin.tmp"); for (File f : objectTmpFileInThisFolder) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index d550781858f..fea43e65c80 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -356,7 +356,7 @@ public abstract class AbstractMemTable implements IMemTable { createAlignedMemChunkGroupIfNotExistAndGet(deviceID, schemaList); memChunkGroup.writeTablet( insertTabletNode.getTimes(), - insertTabletNode.getColumns(), + insertTabletNode.getColumnsAndConvertObjects(), insertTabletNode.getBitMaps(), schemaList, splitStart, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 5b4e09b0154..4a270779fe8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -43,7 +43,6 @@ import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; @@ -96,12 +95,14 @@ import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -1021,7 +1022,9 @@ public class TsFileProcessor { // TEXT data size if (dataType.isBinary()) { Binary[] binColumn = (Binary[]) column; - memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end, null); + if (dataType != TSDataType.OBJECT) { + memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end, null); + } } } @@ -2346,57 +2349,34 @@ public class TsFileProcessor { InsertTabletNode insertTabletNode, List<int[]> rangeList, TSStatus[] results) throws WriteProcessException { if (insertTabletNode instanceof RelationalInsertTabletNode) { - RelationalInsertTabletNode relationalInsertTabletNode = - (RelationalInsertTabletNode) insertTabletNode; - List<List<FileNode>> fileNodesList = relationalInsertTabletNode.getFileNodeList(); - if (fileNodesList != null) { - for (int j = 0; j < fileNodesList.size(); j++) { - List<FileNode> fileNodeList = fileNodesList.get(j); - for (int i = 0; i < fileNodeList.size(); i++) { - FileNode fileNode = fileNodeList.get(i); - String objectFileName = - insertTabletNode.getTimes()[i] - + "-" - + config.getDataNodeId() - + "-" - + dataRegionInfo.getDataRegion().objectFileId.incrementAndGet() - + ".bin"; - String objectTmpFileName = objectFileName + ".tmp"; + RelationalInsertTabletNode node = (RelationalInsertTabletNode) insertTabletNode; + for (int j = 0; j < node.getColumns().length; j++) { + if (node.getDataType(j) == TSDataType.OBJECT) { + Binary[] objectColumn = node.getObjectColumns().get(j); + for (int i = 0; i < node.getTimes().length; i++) { + ByteBuffer buffer = ByteBuffer.wrap(objectColumn[i].getValues()).duplicate(); + String relativePathString = ReadWriteIOUtils.readString(buffer); + boolean isEoF = ReadWriteIOUtils.readBool(buffer); + long offset = ReadWriteIOUtils.readLong(buffer); + byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); + String relativeTmpPathString = relativePathString + ".tmp"; + String objectFileDir; File objectTmpFile; - String relativePathString = - dataRegionInfo.getDataRegion().getDatabaseName() - + File.separator - + dataRegionInfo.getDataRegion().getDataRegionId() - + File.separator - + tsFileResource.getTsFileID().timePartitionId - + File.separator - + objectFileName; try { - String baseDir = TierManager.getInstance().getNextFolderForObjectFile(); - String objectFileDir = - baseDir - + File.separator - + dataRegionInfo.getDataRegion().getDatabaseName() - + File.separator - + dataRegionInfo.getDataRegion().getDataRegionId() - + File.separator - + tsFileResource.getTsFileID().timePartitionId; - + objectFileDir = TierManager.getInstance().getNextFolderForObjectFile(); objectTmpFile = - FSFactoryProducer.getFSFactory().getFile(objectFileDir, objectTmpFileName); + FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeTmpPathString); try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) { - writer.write(fileNode); + writer.write(isEoF, offset, content); } - fileNode.setFilePath(relativePathString); } catch (Exception e) { results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); throw new WriteProcessException(e); } // TODO:[OBJECT] write file node wal - if (fileNode.isEOF()) { + if (isEoF) { File objectFile = - FSFactoryProducer.getFSFactory() - .getFile(objectTmpFile.getParentFile(), objectFileName); + FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativePathString); try { Files.move( objectTmpFile.toPath(), @@ -2411,19 +2391,23 @@ public class TsFileProcessor { System.arraycopy( BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 0, Long.BYTES); System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); - (relationalInsertTabletNode.getObjectColumns().get(j))[i] = new Binary(valueBytes); + node.setConvertedObjectValue(new Binary(valueBytes), j, i); + } else { + node.setConvertedObjectValue(null, j, i); } - WALFlushListener walFlushListener; - try { - walFlushListener = walNode.log(workMemTable.getMemTableId(), fileNode); - if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { - throw walFlushListener.getCause(); - } - } catch (Exception e) { - results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); - throw new WriteProcessException(e); - } + // WALFlushListener walFlushListener; + // try { + // walFlushListener = walNode.log(workMemTable.getMemTableId(), fileNode); + // if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) + // { + // throw walFlushListener.getCause(); + // } + // } catch (Exception e) { + // results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, + // e.getMessage()); + // throw new WriteProcessException(e); + // } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java index ca0088479e9..aa5f4beafbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java @@ -21,7 +21,9 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.generator; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; @@ -167,6 +169,20 @@ public class TsFileNameGenerator { } } + public static String generateObjectFilePath( + long time, int datanodeId, String databaseName, int dataRegionId) { + String objectFileName = time + ".bin"; + String relativePathString = + databaseName + + File.separator + + dataRegionId + + File.separator + + TimePartitionUtils.getTimePartitionId(time) + + File.separator + + objectFileName; + return relativePathString; + } + @TestOnly public static TsFileResource increaseCrossCompactionCnt(TsFileResource tsFileResource) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java index 2bc85453744..7ef14b6a39c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.utils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -60,18 +59,15 @@ public class ObjectWriter implements AutoCloseable { fos = new FileOutputStream(filePath, true); } - public void write(FileNode fileNode) throws IOException { - if (file.length() != fileNode.getOffset()) { + public void write(boolean isEoF, long offset, byte[] content) throws IOException { + if (file.length() != offset) { throw new IOException( - "The file length " - + file.length() - + " is not equal to the offset " - + fileNode.getOffset()); + "The file length " + file.length() + " is not equal to the offset " + offset); } - if (file.length() + fileNode.getContent().length > config.getMaxObjectSizeInByte()) { + if (file.length() + content.length > config.getMaxObjectSizeInByte()) { throw new IOException("The file length is larger than max_object_file_size_in_bytes"); } - fos.write(fileNode.getContent()); + fos.write(content); } @Override
