This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fdd72bcd67 [IOTDB-4544] Implement tsblock split for schema query
(#7532)
fdd72bcd67 is described below
commit fdd72bcd677d9e8180ea423e8f518fbef5c0fca8
Author: Marcos_Zyk <[email protected]>
AuthorDate: Fri Oct 7 14:36:24 2022 +0800
[IOTDB-4544] Implement tsblock split for schema query (#7532)
---
.../operator/schema/CountMergeOperator.java | 90 ++++++++++++----------
.../operator/schema/DevicesSchemaScanOperator.java | 16 ++--
.../schema/LevelTimeSeriesCountOperator.java | 50 ++++++++----
.../schema/PathsUsingTemplateScanOperator.java | 14 ++--
.../operator/schema/SchemaQueryScanOperator.java | 31 ++++----
.../operator/schema/SchemaTsBlockUtil.java | 57 ++++++++++++++
.../schema/TimeSeriesSchemaScanOperator.java | 16 ++--
.../operator/schema/CountMergeOperatorTest.java | 2 +
8 files changed, 179 insertions(+), 97 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 7524b7c455..9dd3d15581 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -35,21 +35,25 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static com.google.common.util.concurrent.Futures.successfulAsList;
+import java.util.NoSuchElementException;
public class CountMergeOperator implements ProcessOperator {
private final PlanNodeId planNodeId;
private final OperatorContext operatorContext;
- private boolean isFinished;
+
+ private List<TsBlock> tsBlockList = new ArrayList<>();
+ private int currentIndex = 0;
private final List<Operator> children;
+ private final boolean isGroupByLevel;
+
public CountMergeOperator(
PlanNodeId planNodeId, OperatorContext operatorContext, List<Operator>
children) {
this.planNodeId = planNodeId;
this.operatorContext = operatorContext;
this.children = children;
+ isGroupByLevel = children.get(0) instanceof LevelTimeSeriesCountOperator;
}
@Override
@@ -59,34 +63,46 @@ public class CountMergeOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
- List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (Operator child : children) {
- ListenableFuture<?> blocked = child.isBlocked();
- if (!blocked.isDone()) {
- listenableFutures.add(blocked);
+ while (!child.isFinished()) {
+ ListenableFuture<?> blocked = child.isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ if (child.hasNext()) {
+ TsBlock tsBlock = child.next();
+ if (null != tsBlock && !tsBlock.isEmpty()) {
+ tsBlockList.add(tsBlock);
+ }
+ }
}
}
- return listenableFutures.isEmpty() ? NOT_BLOCKED :
successfulAsList(listenableFutures);
+
+ generateResultTsBlockList();
+
+ return NOT_BLOCKED;
}
@Override
public TsBlock next() {
- isFinished = true;
- if (children.get(0) instanceof LevelTimeSeriesCountOperator) {
- return nextWithGroupByLevel();
+ if (!hasNext()) {
+ throw new NoSuchElementException();
}
- return nextWithoutGroupByLevel();
+ currentIndex++;
+ return tsBlockList.get(currentIndex - 1);
}
- private TsBlock nextWithoutGroupByLevel() {
- List<TsBlock> tsBlocks = new ArrayList<>(children.size());
- for (Operator child : children) {
- if (child.hasNext()) {
- tsBlocks.add(child.next());
- }
+ private void generateResultTsBlockList() {
+ if (isGroupByLevel) {
+ generateResultWithGroupByLevel();
+ } else {
+ generateResultWithoutGroupByLevel();
}
+ }
+
+ private void generateResultWithoutGroupByLevel() {
int totalCount = 0;
- for (TsBlock tsBlock : tsBlocks) {
+ for (TsBlock tsBlock : tsBlockList) {
int count = tsBlock.getColumn(0).getInt(0);
totalCount += count;
}
@@ -94,44 +110,38 @@ public class CountMergeOperator implements ProcessOperator
{
tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
tsBlockBuilder.getColumnBuilder(0).writeInt(totalCount);
tsBlockBuilder.declarePosition();
- return tsBlockBuilder.build();
+ this.tsBlockList = Collections.singletonList(tsBlockBuilder.build());
}
- private TsBlock nextWithGroupByLevel() {
- List<TsBlock> tsBlocks = new ArrayList<>(children.size());
- for (Operator child : children) {
- if (child.hasNext()) {
- tsBlocks.add(child.next());
- }
- }
- TsBlockBuilder tsBlockBuilder =
- new TsBlockBuilder(Arrays.asList(TSDataType.TEXT, TSDataType.INT32));
+ private void generateResultWithGroupByLevel() {
Map<String, Integer> countMap = new HashMap<>();
- for (TsBlock tsBlock : tsBlocks) {
+ for (TsBlock tsBlock : tsBlockList) {
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
String columnName = tsBlock.getColumn(0).getBinary(i).getStringValue();
int count = tsBlock.getColumn(1).getInt(i);
countMap.put(columnName, countMap.getOrDefault(columnName, 0) + count);
}
}
- countMap.forEach(
- (column, count) -> {
- tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
- tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(column));
- tsBlockBuilder.getColumnBuilder(1).writeInt(count);
- tsBlockBuilder.declarePosition();
- });
- return tsBlockBuilder.build();
+ this.tsBlockList =
+ SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+ countMap.entrySet().iterator(),
+ Arrays.asList(TSDataType.TEXT, TSDataType.INT32),
+ (entry, tsBlockBuilder) -> {
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeBinary(new
Binary(entry.getKey()));
+ tsBlockBuilder.getColumnBuilder(1).writeInt(entry.getValue());
+ tsBlockBuilder.declarePosition();
+ });
}
@Override
public boolean hasNext() {
- return !isFinished;
+ return currentIndex < tsBlockList.size();
}
@Override
public boolean isFinished() {
- return isFinished;
+ return !hasNext();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesSchemaScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesSchemaScanOperator.java
index fc8a3993e5..9eacd6e8cb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesSchemaScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesSchemaScanOperator.java
@@ -57,18 +57,18 @@ public class DevicesSchemaScanOperator extends
SchemaQueryScanOperator {
}
@Override
- protected TsBlock createTsBlock() {
- TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ protected List<TsBlock> createTsBlockList() {
try {
- ((SchemaDriverContext)
operatorContext.getInstanceContext().getDriverContext())
- .getSchemaRegion()
- .getMatchedDevices(convertToPhysicalPlan())
- .left
- .forEach(device -> setColumns(device, builder));
+ List<ShowDevicesResult> schemaRegionResult =
+ ((SchemaDriverContext)
operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .getMatchedDevices(convertToPhysicalPlan())
+ .left;
+ return SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+ schemaRegionResult.iterator(), outputDataTypes, this::setColumns);
} catch (MetadataException e) {
throw new RuntimeException(e.getMessage(), e);
}
- return builder.build();
}
// ToDo @xinzhongtianxia remove this temporary converter after mpp online
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index aa5d865014..f1ce0e88e6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -29,11 +29,11 @@ import
org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
@@ -48,7 +48,8 @@ public class LevelTimeSeriesCountOperator implements
SourceOperator {
private final String value;
private final boolean isContains;
- private boolean isFinished;
+ private List<TsBlock> tsBlockList;
+ private int currentIndex = 0;
private final List<TSDataType> outputDataTypes;
@@ -87,8 +88,23 @@ public class LevelTimeSeriesCountOperator implements
SourceOperator {
@Override
public TsBlock next() {
- isFinished = true;
- TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ currentIndex++;
+ return tsBlockList.get(currentIndex - 1);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (tsBlockList == null) {
+ createTsBlockList();
+ }
+
+ return currentIndex < tsBlockList.size();
+ }
+
+ public void createTsBlockList() {
Map<PartialPath, Integer> countMap;
try {
if (key != null && value != null) {
@@ -107,24 +123,24 @@ public class LevelTimeSeriesCountOperator implements
SourceOperator {
} catch (MetadataException e) {
throw new RuntimeException(e.getMessage(), e);
}
- countMap.forEach(
- (path, count) -> {
- tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
- tsBlockBuilder.getColumnBuilder(0).writeBinary(new
Binary(path.getFullPath()));
- tsBlockBuilder.getColumnBuilder(1).writeInt(count);
- tsBlockBuilder.declarePosition();
- });
- return tsBlockBuilder.build();
- }
- @Override
- public boolean hasNext() {
- return !isFinished;
+ tsBlockList =
+ SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+ countMap.entrySet().iterator(),
+ outputDataTypes,
+ (entry, tsBlockBuilder) -> {
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder
+ .getColumnBuilder(0)
+ .writeBinary(new Binary(entry.getKey().getFullPath()));
+ tsBlockBuilder.getColumnBuilder(1).writeInt(entry.getValue());
+ tsBlockBuilder.declarePosition();
+ });
}
@Override
public boolean isFinished() {
- return isFinished;
+ return !hasNext();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/PathsUsingTemplateScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/PathsUsingTemplateScanOperator.java
index b65dade6de..0c8a9a2b4e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/PathsUsingTemplateScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/PathsUsingTemplateScanOperator.java
@@ -50,17 +50,17 @@ public class PathsUsingTemplateScanOperator extends
SchemaQueryScanOperator {
}
@Override
- protected TsBlock createTsBlock() {
- TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ protected List<TsBlock> createTsBlockList() {
try {
- ((SchemaDriverContext)
operatorContext.getInstanceContext().getDriverContext())
- .getSchemaRegion()
- .getPathsUsingTemplate(templateId)
- .forEach(path -> setColumns(path, builder));
+ List<String> schemaRegionResult =
+ ((SchemaDriverContext)
operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .getPathsUsingTemplate(templateId);
+ return SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+ schemaRegionResult.iterator(), outputDataTypes, this::setColumns);
} catch (MetadataException e) {
throw new RuntimeException(e.getMessage(), e);
}
- return builder.build();
}
private void setColumns(String path, TsBlockBuilder builder) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index 4493dc58a8..23bbcd4366 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -24,13 +24,16 @@ import
org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import java.util.List;
+import java.util.NoSuchElementException;
+
import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public abstract class SchemaQueryScanOperator implements SourceOperator {
protected OperatorContext operatorContext;
- protected TsBlock tsBlock;
- protected boolean isFinished = false;
+ protected List<TsBlock> tsBlockList;
+ protected int currentIndex = 0;
protected int limit;
protected int offset;
@@ -54,7 +57,7 @@ public abstract class SchemaQueryScanOperator implements
SourceOperator {
this.sourceId = sourceId;
}
- protected abstract TsBlock createTsBlock();
+ protected abstract List<TsBlock> createTsBlockList();
public PartialPath getPartialPath() {
return partialPath;
@@ -87,25 +90,19 @@ public abstract class SchemaQueryScanOperator implements
SourceOperator {
@Override
public TsBlock next() {
- isFinished = true;
- TsBlock result = tsBlock;
- tsBlock = null;
- return result;
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ currentIndex++;
+ return tsBlockList.get(currentIndex - 1);
}
@Override
public boolean hasNext() {
- if (isFinished) {
- return false;
- }
- if (tsBlock == null) {
- tsBlock = createTsBlock();
- if (tsBlock.getPositionCount() == 0) {
- isFinished = true;
- return false;
- }
+ if (tsBlockList == null) {
+ tsBlockList = createTsBlockList();
}
- return true;
+ return currentIndex < tsBlockList.size();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaTsBlockUtil.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaTsBlockUtil.java
new file mode 100644
index 0000000000..52b11fca50
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaTsBlockUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.schema;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class SchemaTsBlockUtil {
+
+ private static final long MAX_SIZE = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+ public static <T> List<TsBlock> transferSchemaResultToTsBlockList(
+ Iterator<T> schemaRegionResultIterator,
+ List<TSDataType> outputDataTypes,
+ BiConsumer<T, TsBlockBuilder> consumer) {
+ List<TsBlock> result = new ArrayList<>();
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+ T schemaRegionResultElement;
+ while (schemaRegionResultIterator.hasNext()) {
+ schemaRegionResultElement = schemaRegionResultIterator.next();
+ consumer.accept(schemaRegionResultElement, tsBlockBuilder);
+ if (tsBlockBuilder.getRetainedSizeInBytes() >= MAX_SIZE) {
+ result.add(tsBlockBuilder.build());
+ tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+ }
+ }
+ if (!tsBlockBuilder.isEmpty()) {
+ result.add(tsBlockBuilder.build());
+ }
+ return result;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
index ab8a49e1b8..7fbc42f88f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
@@ -89,18 +89,18 @@ public class TimeSeriesSchemaScanOperator extends
SchemaQueryScanOperator {
}
@Override
- protected TsBlock createTsBlock() {
- TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ protected List<TsBlock> createTsBlockList() {
try {
- ((SchemaDriverContext)
operatorContext.getInstanceContext().getDriverContext())
- .getSchemaRegion()
- .showTimeseries(convertToPhysicalPlan(),
operatorContext.getInstanceContext())
- .left
- .forEach(series -> setColumns(series, builder));
+ List<ShowTimeSeriesResult> schemaRegionResult =
+ ((SchemaDriverContext)
operatorContext.getInstanceContext().getDriverContext())
+ .getSchemaRegion()
+ .showTimeseries(convertToPhysicalPlan(),
operatorContext.getInstanceContext())
+ .left;
+ return SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+ schemaRegionResult.iterator(), outputDataTypes, this::setColumns);
} catch (MetadataException e) {
throw new RuntimeException(e.getMessage(), e);
}
- return builder.build();
}
// ToDo @xinzhongtianxia remove this temporary converter after mpp online
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
index d0e63e49bb..97838bd851 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -187,6 +188,7 @@ public class CountMergeOperatorTest {
fragmentInstanceContext.getOperatorContexts().get(0),
Arrays.asList(timeSeriesCountOperator1,
timeSeriesCountOperator2));
TsBlock tsBlock = null;
+ Assert.assertTrue(countMergeOperator.isBlocked().isDone());
while (countMergeOperator.hasNext()) {
tsBlock = countMergeOperator.next();
assertFalse(countMergeOperator.hasNext());