jt2594838 commented on code in PR #12744:
URL: https://github.com/apache/iotdb/pull/12744#discussion_r1678669069
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java:
##########
@@ -318,6 +145,234 @@ public void splitTsFileByDataPartition() throws
IOException, IllegalStateExcepti
}
}
+ private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader,
byte marker)
+ throws IOException {
+ long chunkOffset = reader.position();
+ timeChunkIndexOfCurrentValueColumn = pageIndex2TimesList.size();
+ consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
+ handleModification(offset2Deletions, chunkOffset);
+
+ ChunkHeader header = reader.readChunkHeader(marker);
+ String measurementId = header.getMeasurementID();
+ if (header.getDataSize() == 0) {
+ throw new TsFileRuntimeException(
+ String.format(
+ "Empty Nonaligned Chunk or Time Chunk with offset %d in TsFile
%s.",
+ chunkOffset, tsFile.getPath()));
+ }
+
+ isAligned =
+ ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK);
+ IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset -
Byte.BYTES);
+ // When loading TsFile with Chunk in data zone but no matched ChunkMetadata
+ // at the end of file, this Chunk needs to be skipped.
+ if (chunkMetadata == null) {
+ reader.readChunk(-1, header.getDataSize());
+ return;
+ }
+ TTimePartitionSlot timePartitionSlot =
+ TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime());
+ ChunkData chunkData =
+ ChunkData.createChunkData(
+ isAligned, ((PlainDeviceID) curDevice).toStringID(), header,
timePartitionSlot);
+
+ if (!needDecodeChunk(chunkMetadata)) {
+ chunkData.setNotDecode();
+ chunkData.writeEntireChunk(reader.readChunk(-1, header.getDataSize()),
chunkMetadata);
+ if (isAligned) {
+ isTimeChunkNeedDecode = false;
+ pageIndex2ChunkData
+ .computeIfAbsent(1, o -> new ArrayList<>())
+ .add((AlignedChunkData) chunkData);
Review Comment:
Why is the page index starting from 1?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils;
+
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AlignedSeriesBatchCompactionUtils {
+
+ private AlignedSeriesBatchCompactionUtils() {}
+
+ public static List<IMeasurementSchema> selectColumnBatchToCompact(
+ List<IMeasurementSchema> schemaList, Set<String> compactedMeasurements,
int batchSize) {
+ List<IMeasurementSchema> selectedColumnBatch = new ArrayList<>(batchSize);
+ for (IMeasurementSchema schema : schemaList) {
+ if (!isLargeDataType(schema.getType())) {
+ continue;
+ }
+ if (compactedMeasurements.contains(schema.getMeasurementId())) {
+ continue;
+ }
+ compactedMeasurements.add(schema.getMeasurementId());
+ selectedColumnBatch.add(schema);
+ if (selectedColumnBatch.size() >= batchSize) {
+ return selectedColumnBatch;
+ }
+ if (compactedMeasurements.size() == schemaList.size()) {
+ return selectedColumnBatch;
+ }
+ }
+ for (IMeasurementSchema schema : schemaList) {
+ if (compactedMeasurements.contains(schema.getMeasurementId())) {
+ continue;
+ }
+ selectedColumnBatch.add(schema);
+ compactedMeasurements.add(schema.getMeasurementId());
+ if (selectedColumnBatch.size() >= batchSize) {
+ break;
+ }
+ if (compactedMeasurements.size() == schemaList.size()) {
+ break;
+ }
+ }
+ return selectedColumnBatch;
+ }
Review Comment:
TODO: select by chunk size of each series
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils;
+
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AlignedSeriesBatchCompactionUtils {
+
+ private AlignedSeriesBatchCompactionUtils() {}
+
+ public static List<IMeasurementSchema> selectColumnBatchToCompact(
+ List<IMeasurementSchema> schemaList, Set<String> compactedMeasurements,
int batchSize) {
+ List<IMeasurementSchema> selectedColumnBatch = new ArrayList<>(batchSize);
+ for (IMeasurementSchema schema : schemaList) {
+ if (!isLargeDataType(schema.getType())) {
+ continue;
+ }
+ if (compactedMeasurements.contains(schema.getMeasurementId())) {
+ continue;
+ }
+ compactedMeasurements.add(schema.getMeasurementId());
+ selectedColumnBatch.add(schema);
+ if (selectedColumnBatch.size() >= batchSize) {
+ return selectedColumnBatch;
+ }
+ if (compactedMeasurements.size() == schemaList.size()) {
+ return selectedColumnBatch;
+ }
+ }
+ for (IMeasurementSchema schema : schemaList) {
+ if (compactedMeasurements.contains(schema.getMeasurementId())) {
+ continue;
+ }
+ selectedColumnBatch.add(schema);
+ compactedMeasurements.add(schema.getMeasurementId());
+ if (selectedColumnBatch.size() >= batchSize) {
+ break;
+ }
+ if (compactedMeasurements.size() == schemaList.size()) {
+ break;
+ }
+ }
+ return selectedColumnBatch;
+ }
+
+ private static boolean isLargeDataType(TSDataType dataType) {
+ return dataType.equals(TSDataType.BLOB)
+ || dataType.equals(TSDataType.TEXT)
+ || dataType.equals(TSDataType.STRING);
+ }
+
+ public static void markAlignedChunkHasDeletion(
+ LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
+ readerAndChunkMetadataList) {
+ for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair :
readerAndChunkMetadataList) {
+ List<AlignedChunkMetadata> alignedChunkMetadataList = pair.getRight();
+ markAlignedChunkHasDeletion(alignedChunkMetadataList);
+ }
+ }
+
+ public static void markAlignedChunkHasDeletion(
+ List<AlignedChunkMetadata> alignedChunkMetadataList) {
+ for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList)
{
+ IChunkMetadata timeChunkMetadata =
alignedChunkMetadata.getTimeChunkMetadata();
+ for (IChunkMetadata iChunkMetadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
+ if (iChunkMetadata != null && iChunkMetadata.isModified()) {
+ timeChunkMetadata.setModified(true);
+ break;
+ }
+ }
+ }
+ }
+
+ public static AlignedChunkMetadata filterAlignedChunkMetadata(
+ AlignedChunkMetadata alignedChunkMetadata, List<String>
selectedMeasurements) {
+ List<IChunkMetadata> valueChunkMetadataList =
+ Arrays.asList(new IChunkMetadata[selectedMeasurements.size()]);
+
+ Map<String, Integer> measurementIndex = new HashMap<>();
+ for (int i = 0; i < selectedMeasurements.size(); i++) {
+ measurementIndex.put(selectedMeasurements.get(i), i);
+ }
+
+ for (IChunkMetadata chunkMetadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
+ if (chunkMetadata == null) {
+ continue;
+ }
+ if (measurementIndex.containsKey(chunkMetadata.getMeasurementUid())) {
+ valueChunkMetadataList.set(
+ measurementIndex.get(chunkMetadata.getMeasurementUid()),
chunkMetadata);
+ }
Review Comment:
How about computeIfPresent?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils;
+
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AlignedSeriesBatchCompactionUtils {
+
+ private AlignedSeriesBatchCompactionUtils() {}
+
+ public static List<IMeasurementSchema> selectColumnBatchToCompact(
+ List<IMeasurementSchema> schemaList, Set<String> compactedMeasurements,
int batchSize) {
+ List<IMeasurementSchema> selectedColumnBatch = new ArrayList<>(batchSize);
+ for (IMeasurementSchema schema : schemaList) {
+ if (!isLargeDataType(schema.getType())) {
+ continue;
+ }
+ if (compactedMeasurements.contains(schema.getMeasurementId())) {
+ continue;
+ }
+ compactedMeasurements.add(schema.getMeasurementId());
+ selectedColumnBatch.add(schema);
+ if (selectedColumnBatch.size() >= batchSize) {
+ return selectedColumnBatch;
+ }
+ if (compactedMeasurements.size() == schemaList.size()) {
+ return selectedColumnBatch;
+ }
+ }
+ for (IMeasurementSchema schema : schemaList) {
+ if (compactedMeasurements.contains(schema.getMeasurementId())) {
+ continue;
+ }
+ selectedColumnBatch.add(schema);
+ compactedMeasurements.add(schema.getMeasurementId());
+ if (selectedColumnBatch.size() >= batchSize) {
+ break;
+ }
+ if (compactedMeasurements.size() == schemaList.size()) {
+ break;
+ }
+ }
+ return selectedColumnBatch;
+ }
+
+ private static boolean isLargeDataType(TSDataType dataType) {
+ return dataType.equals(TSDataType.BLOB)
+ || dataType.equals(TSDataType.TEXT)
+ || dataType.equals(TSDataType.STRING);
+ }
+
+ public static void markAlignedChunkHasDeletion(
+ LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
+ readerAndChunkMetadataList) {
+ for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair :
readerAndChunkMetadataList) {
+ List<AlignedChunkMetadata> alignedChunkMetadataList = pair.getRight();
+ markAlignedChunkHasDeletion(alignedChunkMetadataList);
+ }
+ }
+
+ public static void markAlignedChunkHasDeletion(
+ List<AlignedChunkMetadata> alignedChunkMetadataList) {
+ for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList)
{
+ IChunkMetadata timeChunkMetadata =
alignedChunkMetadata.getTimeChunkMetadata();
+ for (IChunkMetadata iChunkMetadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
+ if (iChunkMetadata != null && iChunkMetadata.isModified()) {
+ timeChunkMetadata.setModified(true);
+ break;
+ }
+ }
+ }
+ }
+
+ public static AlignedChunkMetadata filterAlignedChunkMetadata(
+ AlignedChunkMetadata alignedChunkMetadata, List<String>
selectedMeasurements) {
+ List<IChunkMetadata> valueChunkMetadataList =
+ Arrays.asList(new IChunkMetadata[selectedMeasurements.size()]);
+
+ Map<String, Integer> measurementIndex = new HashMap<>();
+ for (int i = 0; i < selectedMeasurements.size(); i++) {
+ measurementIndex.put(selectedMeasurements.get(i), i);
+ }
+
+ for (IChunkMetadata chunkMetadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
+ if (chunkMetadata == null) {
+ continue;
+ }
+ if (measurementIndex.containsKey(chunkMetadata.getMeasurementUid())) {
+ valueChunkMetadataList.set(
+ measurementIndex.get(chunkMetadata.getMeasurementUid()),
chunkMetadata);
+ }
+ }
+ return new AlignedChunkMetadata(
+ alignedChunkMetadata.getTimeChunkMetadata(), valueChunkMetadataList);
+ }
+
+ public static ModifiedStatus calculateAlignedPageModifiedStatus(
+ long startTime, long endTime, AlignedChunkMetadata
originAlignedChunkMetadata) {
+ ModifiedStatus lastPageStatus = null;
+ for (IChunkMetadata valueChunkMetadata :
+ originAlignedChunkMetadata.getValueChunkMetadataList()) {
+ ModifiedStatus currentPageStatus =
+ valueChunkMetadata == null
+ ? ModifiedStatus.ALL_DELETED
+ : checkIsModified(startTime, endTime,
valueChunkMetadata.getDeleteIntervalList());
+ if (currentPageStatus == ModifiedStatus.PARTIAL_DELETED) {
+ // one of the value pages exist data been deleted partially
+ return ModifiedStatus.PARTIAL_DELETED;
+ }
+ if (lastPageStatus == null) {
+ // first page
+ lastPageStatus = currentPageStatus;
+ continue;
+ }
+ if (!lastPageStatus.equals(currentPageStatus)) {
+ // there are at least two value pages, one is that all data is
deleted, the other is that no
+ // data is deleted
+ lastPageStatus = ModifiedStatus.NONE_DELETED;
+ }
+ }
+ return lastPageStatus;
+ }
+
+ public static ModifiedStatus checkIsModified(
+ long startTime, long endTime, Collection<TimeRange> deletions) {
+ ModifiedStatus status = ModifiedStatus.NONE_DELETED;
+ if (deletions != null) {
+ for (TimeRange range : deletions) {
+ if (range.contains(startTime, endTime)) {
+ // all data on this page or chunk has been deleted
+ return ModifiedStatus.ALL_DELETED;
+ }
+ if (range.overlaps(new TimeRange(startTime, endTime))) {
+ // exist data on this page or chunk been deleted
+ status = ModifiedStatus.PARTIAL_DELETED;
+ }
+ }
+ }
+ return status;
+ }
Review Comment:
Can startTime and endTime be updated in line 169 to produce more precise
results?
For example, startTime = 0, endTime = 100, and deletions = [(0,50),
(50,100)].
In the current implementation, this chunk/page will be marked as
PARTIAL_DELETED, but it is ALL_DELETED, actually.
After iterating the first deletion, startTime = 50, endTime = 100, and it
will be contained by the second deletion.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java:
##########
@@ -145,106 +151,112 @@ void
deserializeFileIntoChunkMetadataQueue(List<FileElement> fileElements)
throws IOException, IllegalPathException {
for (FileElement fileElement : fileElements) {
TsFileResource resource = fileElement.resource;
+ List<AlignedChunkMetadata> alignedChunkMetadataList =
getAlignedChunkMetadataList(resource);
- // read time chunk metadatas and value chunk metadatas in the current
file
- List<IChunkMetadata> timeChunkMetadatas = new ArrayList<>();
- List<List<IChunkMetadata>> valueChunkMetadatas = new ArrayList<>();
- for (Map.Entry<String, Map<TsFileResource, Pair<Long, Long>>> entry :
- timeseriesMetadataOffsetMap.entrySet()) {
- String measurementID = entry.getKey();
- Pair<Long, Long> timeseriesOffsetInCurrentFile =
entry.getValue().get(resource);
- if (measurementID.equals("")) {
- // read time chunk metadatas
- if (timeseriesOffsetInCurrentFile == null) {
- // current file does not contain this aligned device
- timeChunkMetadatas = null;
- break;
- }
- timeChunkMetadatas =
+ if (alignedChunkMetadataList.isEmpty()) {
+ // all chunks has been deleted in this file or current file does not
contain this aligned
+ // device, just remove it
+ removeFile(fileElement);
+ }
+
+ // put aligned chunk metadatas into queue
+ for (int i = 0; i < alignedChunkMetadataList.size(); i++) {
+ chunkMetadataQueue.add(
+ new ChunkMetadataElement(
+ alignedChunkMetadataList.get(i),
+ resource.getVersion(),
+ i == alignedChunkMetadataList.size() - 1,
+ fileElement,
+ isBatchedCompaction));
+ }
+ }
+ }
+
+ protected List<AlignedChunkMetadata>
getAlignedChunkMetadataList(TsFileResource resource)
+ throws IOException {
+ // read time chunk metadatas and value chunk metadatas in the current file
+ List<IChunkMetadata> timeChunkMetadatas = new ArrayList<>();
Review Comment:
It seems that timeChunkMetadatas does not need to be created here.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java:
##########
@@ -145,106 +151,112 @@ void
deserializeFileIntoChunkMetadataQueue(List<FileElement> fileElements)
throws IOException, IllegalPathException {
for (FileElement fileElement : fileElements) {
TsFileResource resource = fileElement.resource;
+ List<AlignedChunkMetadata> alignedChunkMetadataList =
getAlignedChunkMetadataList(resource);
- // read time chunk metadatas and value chunk metadatas in the current
file
- List<IChunkMetadata> timeChunkMetadatas = new ArrayList<>();
- List<List<IChunkMetadata>> valueChunkMetadatas = new ArrayList<>();
- for (Map.Entry<String, Map<TsFileResource, Pair<Long, Long>>> entry :
- timeseriesMetadataOffsetMap.entrySet()) {
- String measurementID = entry.getKey();
- Pair<Long, Long> timeseriesOffsetInCurrentFile =
entry.getValue().get(resource);
- if (measurementID.equals("")) {
- // read time chunk metadatas
- if (timeseriesOffsetInCurrentFile == null) {
- // current file does not contain this aligned device
- timeChunkMetadatas = null;
- break;
- }
- timeChunkMetadatas =
+ if (alignedChunkMetadataList.isEmpty()) {
+ // all chunks has been deleted in this file or current file does not
contain this aligned
Review Comment:
has -> have
current file -> the current file
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java:
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.CompactChunkPlan;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.FirstBatchCompactionAlignedChunkWriter;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.FollowingBatchCompactionAlignedChunkWriter;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.FastAlignedSeriesCompactionExecutor;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.PageElement;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.FollowedBatchedCompactionFlushController;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.exception.write.PageException;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class BatchedFastAlignedSeriesCompactionExecutor
+ extends FastAlignedSeriesCompactionExecutor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+ private final Set<String> compactedMeasurements;
+ private final IMeasurementSchema timeSchema;
+ private final List<IMeasurementSchema> valueMeasurementSchemas;
+ private final List<TsFileResource> sortedSourceFiles;
+
+ private final Map<TsFileResource, List<AlignedChunkMetadata>>
alignedChunkMetadataCache;
+ private final BatchCompactionPlan batchCompactionPlan;
+ private final int batchSize =
+
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
+
+ public BatchedFastAlignedSeriesCompactionExecutor(
+ AbstractCompactionWriter compactionWriter,
+ Map<String, Map<TsFileResource, Pair<Long, Long>>>
timeseriesMetadataOffsetMap,
+ Map<TsFileResource, TsFileSequenceReader> readerCacheMap,
+ Map<String, PatternTreeMap<Modification,
PatternTreeMapFactory.ModsSerializer>>
+ modificationCacheMap,
+ List<TsFileResource> sortedSourceFiles,
+ IDeviceID deviceId,
+ int subTaskId,
+ List<IMeasurementSchema> measurementSchemas,
+ FastCompactionTaskSummary summary) {
+ super(
+ compactionWriter,
+ timeseriesMetadataOffsetMap,
+ readerCacheMap,
+ modificationCacheMap,
+ sortedSourceFiles,
+ deviceId,
+ subTaskId,
+ measurementSchemas,
+ summary);
+ timeSchema = measurementSchemas.remove(0);
+ valueMeasurementSchemas = measurementSchemas;
+ this.compactedMeasurements = new HashSet<>();
+ this.sortedSourceFiles = sortedSourceFiles;
+ this.alignedChunkMetadataCache = new HashMap<>();
+ this.batchCompactionPlan = new BatchCompactionPlan();
+ }
+
+ private List<AlignedChunkMetadata>
getAlignedChunkMetadataListBySelectedValueColumn(
+ TsFileResource tsFileResource, List<IMeasurementSchema>
selectedValueMeasurementSchemas)
+ throws IOException {
+ // 1. get Full AlignedChunkMetadata from cache
+ List<AlignedChunkMetadata> alignedChunkMetadataList = null;
+ if (alignedChunkMetadataCache.containsKey(tsFileResource)) {
+ alignedChunkMetadataList = alignedChunkMetadataCache.get(tsFileResource);
+ } else {
+ alignedChunkMetadataList = getAlignedChunkMetadataList(tsFileResource);
+
AlignedSeriesBatchCompactionUtils.markAlignedChunkHasDeletion(alignedChunkMetadataList);
+ alignedChunkMetadataCache.put(tsFileResource, alignedChunkMetadataList);
+ }
Review Comment:
How about computeIfAbsent?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils;
+
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.encoding.encoder.Encoder;
+import org.apache.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.statistics.TimeStatistics;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.page.TimePageWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FirstBatchCompactionAlignedChunkWriter extends
AlignedChunkWriterImpl {
+
+ private ChunkWriterFlushCallback beforeChunkWriterFlushCallback;
+
+ public FirstBatchCompactionAlignedChunkWriter(VectorMeasurementSchema
schema) {
+ timeChunkWriter =
+ new FirstBatchCompactionTimeChunkWriter(
+ schema.getMeasurementId(),
+ schema.getCompressor(),
+ schema.getTimeTSEncoding(),
+ schema.getTimeEncoder());
+
+ List<String> valueMeasurementIdList = schema.getSubMeasurementsList();
+ List<TSDataType> valueTSDataTypeList =
schema.getSubMeasurementsTSDataTypeList();
+ List<TSEncoding> valueTSEncodingList =
schema.getSubMeasurementsTSEncodingList();
+ List<Encoder> valueEncoderList = schema.getSubMeasurementsEncoderList();
+
+ valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size());
+ for (int i = 0; i < valueMeasurementIdList.size(); i++) {
+ valueChunkWriterList.add(
+ new ValueChunkWriter(
+ valueMeasurementIdList.get(i),
+ schema.getCompressor(),
+ valueTSDataTypeList.get(i),
+ valueTSEncodingList.get(i),
+ valueEncoderList.get(i)));
+ }
+
+ this.valueIndex = 0;
+ this.remainingPointsNumber =
timeChunkWriter.getRemainingPointNumberForCurrentPage();
+ }
+
+ public FirstBatchCompactionAlignedChunkWriter(
+ IMeasurementSchema timeSchema, List<IMeasurementSchema> valueSchemaList)
{
+ timeChunkWriter =
+ new FirstBatchCompactionTimeChunkWriter(
+ timeSchema.getMeasurementId(),
+ timeSchema.getCompressor(),
+ timeSchema.getEncodingType(),
+ timeSchema.getTimeEncoder());
+
+ valueChunkWriterList = new ArrayList<>(valueSchemaList.size());
+ for (int i = 0; i < valueSchemaList.size(); i++) {
+ valueChunkWriterList.add(
+ new ValueChunkWriter(
+ valueSchemaList.get(i).getMeasurementId(),
+ valueSchemaList.get(i).getCompressor(),
+ valueSchemaList.get(i).getType(),
+ valueSchemaList.get(i).getEncodingType(),
+ valueSchemaList.get(i).getValueEncoder()));
+ }
+
+ this.valueIndex = 0;
+ this.remainingPointsNumber =
timeChunkWriter.getRemainingPointNumberForCurrentPage();
+ }
+
+ public FirstBatchCompactionAlignedChunkWriter(List<IMeasurementSchema>
schemaList) {
+ TSEncoding timeEncoding =
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ TSDataType timeType =
TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+ CompressionType timeCompression =
TSFileDescriptor.getInstance().getConfig().getCompressor();
+ timeChunkWriter =
+ new FirstBatchCompactionTimeChunkWriter(
+ "",
+ timeCompression,
+ timeEncoding,
+
TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType));
+
+ valueChunkWriterList = new ArrayList<>(schemaList.size());
+ for (int i = 0; i < schemaList.size(); i++) {
+ valueChunkWriterList.add(
+ new ValueChunkWriter(
+ schemaList.get(i).getMeasurementId(),
+ schemaList.get(i).getCompressor(),
+ schemaList.get(i).getType(),
+ schemaList.get(i).getEncodingType(),
+ schemaList.get(i).getValueEncoder()));
+ }
+
+ this.valueIndex = 0;
+ this.remainingPointsNumber =
timeChunkWriter.getRemainingPointNumberForCurrentPage();
+ }
+
+ @Override
+ public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws
IOException {
+ if (!isEmpty() && beforeChunkWriterFlushCallback != null) {
+ // make sure all page is recorded before this call
Review Comment:
all page is -> all pages are
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java:
##########
@@ -1286,6 +1295,21 @@ private boolean
loadCompactionTaskHotModifiedProps(Properties properties) throws
innerCompactionTaskSelectionModsFileThreshold
!= conf.getInnerCompactionTaskSelectionModsFileThreshold();
+ long compactionMaxAlignedSeriesNumInOneBatch =
+ conf.getCompactionMaxAlignedSeriesNumInOneBatch();
+ int newCompactionMaxAlignedSeriesNumInOneBatch =
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_max_aligned_series_num_in_one_batch",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "compaction_max_aligned_series_num_in_one_batch")));
Review Comment:
Why is `compactionMaxAlignedSeriesNumInOneBatch` a long here?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FollowingBatchCompactionAlignedChunkWriter.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils;
+
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.BatchCompactionCannotAlignedException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.statistics.TimeStatistics;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.page.TimePageWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FollowingBatchCompactionAlignedChunkWriter extends
AlignedChunkWriterImpl {
+ private int currentPage = 0;
+ private CompactChunkPlan compactChunkPlan;
+ private ChunkWriterFlushCallback afterChunkWriterFlushCallback;
+
+ public FollowingBatchCompactionAlignedChunkWriter(
+ IMeasurementSchema timeSchema,
+ List<IMeasurementSchema> valueSchemaList,
+ CompactChunkPlan compactChunkPlan) {
+ timeChunkWriter = new FollowingBatchCompactionTimeChunkWriter();
+
+ valueChunkWriterList = new ArrayList<>(valueSchemaList.size());
+ for (int i = 0; i < valueSchemaList.size(); i++) {
+ valueChunkWriterList.add(
+ new ValueChunkWriter(
+ valueSchemaList.get(i).getMeasurementId(),
+ valueSchemaList.get(i).getCompressor(),
+ valueSchemaList.get(i).getType(),
+ valueSchemaList.get(i).getEncodingType(),
+ valueSchemaList.get(i).getValueEncoder()));
+ }
+ this.valueIndex = 0;
+ this.compactChunkPlan = compactChunkPlan;
+ }
+
+ @Override
+ protected boolean checkPageSizeAndMayOpenANewPage() {
+ long endTime =
+ ((FollowingBatchCompactionTimeChunkWriter)
timeChunkWriter).chunkStatistics.getEndTime();
+ return endTime ==
compactChunkPlan.getPageRecords().get(currentPage).getTimeRange().getMax();
+ }
+
+ @Override
+ public void sealCurrentPage() {
+ writePageToPageBuffer();
+ }
+
+ @Override
+ protected void writePageToPageBuffer() {
+ FollowingBatchCompactionTimeChunkWriter
followingBatchCompactionTimeChunkWriter =
+ (FollowingBatchCompactionTimeChunkWriter) timeChunkWriter;
+ TimeStatistics pageStatistics =
followingBatchCompactionTimeChunkWriter.pageStatistics;
+ if (pageStatistics.isEmpty()) {
+ return;
+ }
+ CompactPagePlan alignedTimePage =
compactChunkPlan.getPageRecords().get(currentPage);
+ if (alignedTimePage.getTimeRange().getMax() !=
pageStatistics.getEndTime()) {
+ throw new BatchCompactionCannotAlignedException(
+ pageStatistics, compactChunkPlan, currentPage);
+ }
+ super.writePageToPageBuffer();
+ currentPage++;
+ }
+
+ @Override
+ public void writePageHeaderAndDataIntoTimeBuff(ByteBuffer data, PageHeader
header)
+ throws PageException {
+ if (currentPage >= compactChunkPlan.getPageRecords().size()
+ || header.getStatistics().getStartTime()
+ !=
compactChunkPlan.getPageRecords().get(currentPage).getTimeRange().getMin()) {
+ throw new BatchCompactionCannotAlignedException(header,
compactChunkPlan, currentPage);
+ }
+ super.writePageHeaderAndDataIntoTimeBuff(data, header);
+ currentPage++;
+ }
+
+ @Override
+ public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws
IOException {
+ if (isEmpty()) {
+ return;
+ }
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.writeToFileWriter(tsfileWriter);
+ }
+ if (afterChunkWriterFlushCallback != null) {
+ afterChunkWriterFlushCallback.call(this);
+ }
+ }
+
+ @Override
+ public boolean checkIsChunkSizeOverThreshold(
+ long size, long pointNum, boolean returnTrueIfChunkEmpty) {
+ if (compactChunkPlan.isCompactedByDirectlyFlush()) {
+ return true;
+ }
+ return currentPage >= compactChunkPlan.getPageRecords().size()
+ || ((FollowingBatchCompactionTimeChunkWriter)
timeChunkWriter).chunkStatistics.getEndTime()
+ == compactChunkPlan.getTimeRange().getMax();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return timeChunkWriter.getPointNum() == 0;
+ }
+
+ @Override
+ public boolean checkIsUnsealedPageOverThreshold(
+ long size, long pointNum, boolean returnTrueIfPageEmpty) {
+ if (currentPage >= compactChunkPlan.getPageRecords().size()) {
+ return true;
+ }
+ CompactPagePlan compactPagePlan =
compactChunkPlan.getPageRecords().get(currentPage);
+ if (compactPagePlan.isCompactedByDirectlyFlush()) {
+ return true;
+ }
+ long endTime =
+ ((FollowingBatchCompactionTimeChunkWriter)
timeChunkWriter).pageStatistics.getEndTime();
+ return endTime ==
compactChunkPlan.getPageRecords().get(currentPage).getTimeRange().getMax();
+ }
+
+ public int getCurrentPage() {
+ return currentPage;
+ }
+
+ public void setCompactChunkPlan(CompactChunkPlan compactChunkPlan) {
+ this.compactChunkPlan = compactChunkPlan;
+ this.currentPage = 0;
+ this.timeChunkWriter = new FollowingBatchCompactionTimeChunkWriter();
+ }
+
+ public void registerAfterFlushChunkWriterCallback(
+ ChunkWriterFlushCallback flushChunkWriterCallback) {
+ this.afterChunkWriterFlushCallback = flushChunkWriterCallback;
+ }
+
+ public static class FollowingBatchCompactionTimeChunkWriter extends
TimeChunkWriter {
+ private TimeStatistics chunkStatistics;
+ private TimeStatistics pageStatistics;
+
+ public FollowingBatchCompactionTimeChunkWriter() {
+ chunkStatistics = new TimeStatistics();
+ pageStatistics = new TimeStatistics();
+ }
+
+ @Override
+ public void write(long time) {
+ chunkStatistics.update(time);
+ pageStatistics.update(time);
+ }
+
+ @Override
+ public void write(long[] timestamps, int batchSize, int arrayOffset) {
+ throw new RuntimeException("unimplemented");
+ }
+
+ @Override
+ public boolean checkPageSizeAndMayOpenANewPage() {
+ throw new RuntimeException("unimplemented");
+ }
+
+ @Override
+ public long getRemainingPointNumberForCurrentPage() {
+ throw new RuntimeException("unimplemented");
+ }
+
+ @Override
+ public void writePageToPageBuffer() {
+ pageStatistics = new TimeStatistics();
+ }
+
+ @Override
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader
header)
+ throws PageException {
+ if (data == null || header.getStatistics() == null ||
header.getStatistics().isEmpty()) {
+ return;
+ }
+ chunkStatistics.mergeStatistics(header.getStatistics());
+ }
+
+ @Override
+ public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws
IOException {
+ chunkStatistics = new TimeStatistics();
+ pageStatistics = new TimeStatistics();
+ }
+
+ @Override
+ public long estimateMaxSeriesMemSize() {
+ return super.estimateMaxSeriesMemSize();
+ }
Review Comment:
Unnecessary, it can be removed.
##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -1270,6 +1270,12 @@ min_cross_compaction_unseq_file_level=1
# Datatype: int
compaction_thread_count=10
+# How many chunk will be compact in aligned series compaction, 10 by default.
+# Set to Integer.MAX_VALUE when less than or equal to 0.
+# effectiveMode: hot_reload
+# Datatype: int
+compaction_max_aligned_series_num_in_one_batch=10
Review Comment:
chunk -> chunks
compact -> compacted
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java:
##########
@@ -76,4 +83,18 @@ public long calculatingDataMemoryCost(CompactionTaskInfo
taskInfo) throws IOExce
+ maxConcurrentChunkSizeFromSourceFile
+ taskInfo.getModificationFileSize();
}
+
+ @Override
+ public long roughEstimateInnerCompactionMemory(List<TsFileResource>
resources)
+ throws IOException {
+ int maxConcurrentSeriesNum =
+ Math.max(
+ config.getCompactionMaxAlignedSeriesNumInOneBatch(),
config.getSubCompactionTaskNum());
+ long maxChunkSize = config.getTargetChunkSize();
+ long maxPageSize = tsFileConfig.getPageSizeInByte();
+ int maxOverlapFileNum =
calculatingMaxOverlapFileNumInSubCompactionTask(resources);
+ // source files (chunk + uncompressed page) * overlap file num
+ // target file (chunk + unsealed page writer)
+ return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize +
maxPageSize);
Review Comment:
Will inner compaction have overlapping files?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java:
##########
@@ -79,12 +82,15 @@ public boolean flushNonAlignedChunk(Chunk chunk,
ChunkMetadata chunkMetadata, in
}
@Override
- public boolean flushAlignedChunk(
- Chunk timeChunk,
- IChunkMetadata timeChunkMetadata,
- List<Chunk> valueChunks,
- List<IChunkMetadata> valueChunkMetadatas,
- int subTaskId) {
+ public boolean flushAlignedChunk(ChunkMetadataElement chunkMetadataElement,
int subTaskId) {
+ throw new RuntimeException("Does not support this method in
ReadPointCrossCompactionWriter");
Review Comment:
May use UnsupportedOperationException.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]