This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/sonar by this push:
new 3496ab12d22
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process done
except for fill, join and last
3496ab12d22 is described below
commit 3496ab12d224b7c97d667e876d60f4f236c9e874
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jun 20 21:14:14 2023 +0800
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process
done except for fill, join and last
---
.../process/AbstractConsumeAllOperator.java | 1 +
.../operator/process/AbstractIntoOperator.java | 10 +-
.../operator/process/AggregationOperator.java | 1 +
.../operator/process/DeviceMergeOperator.java | 316 ---------------------
.../operator/process/DeviceViewIntoOperator.java | 7 +-
.../operator/process/DeviceViewOperator.java | 1 +
.../execution/operator/process/FillOperator.java | 3 +-
.../operator/process/FilterAndProjectOperator.java | 45 ++-
.../execution/operator/process/IntoOperator.java | 1 +
.../execution/operator/process/LimitOperator.java | 1 +
.../operator/process/LinearFillOperator.java | 16 +-
.../operator/process/MergeSortOperator.java | 11 +-
.../execution/operator/process/OffsetOperator.java | 1 +
.../operator/process/ProcessOperator.java | 2 +-
.../process/RawDataAggregationOperator.java | 10 +-
.../process/SingleInputAggregationOperator.java | 1 +
.../process/SlidingWindowAggregationOperator.java | 14 +-
.../execution/operator/process/SortOperator.java | 47 +--
.../operator/process/TagAggregationOperator.java | 68 +++--
.../operator/process/TransformOperator.java | 45 +--
.../operator/source/AlignedSeriesScanUtil.java | 12 +-
.../execution/operator/source/SeriesScanUtil.java | 48 +++-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 36 ---
.../mpp/execution/operator/OperatorMemoryTest.java | 49 ----
24 files changed, 225 insertions(+), 521 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
index c13415d6d12..170c6d7035b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
@@ -81,6 +81,7 @@ public abstract class AbstractConsumeAllOperator extends
AbstractOperator
*
* @return true if results of all children are ready. Return false if some
children is blocked or
* return null.
+ * @throws Exception errors happened while getting tsblock from children
*/
protected boolean prepareInput() throws Exception {
boolean allReady = true;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 8816776d88f..ed618dcc173 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -109,13 +109,12 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
if (memAllowedMaxRowNumber > Integer.MAX_VALUE) {
memAllowedMaxRowNumber = Integer.MAX_VALUE;
}
- int maxRowNumberInStatement =
+ this.maxRowNumberInStatement =
Math.min(
(int) memAllowedMaxRowNumber,
IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit());
long maxStatementSize = maxRowNumberInStatement * statementSizePerLine;
- this.maxRowNumberInStatement = maxRowNumberInStatement;
this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
@@ -175,6 +174,9 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
/**
* Check whether the last write operation was executed successfully, and
throw an exception if the
* execution failed, otherwise continue to execute the operator.
+ *
+ * @throws IntoProcessException wrap InterruptedException with
IntoProcessException while
+ * Interruption happened
*/
private void checkLastWriteOperation() {
if (writeOperationFuture == null) {
@@ -203,8 +205,6 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
writeOperationFuture = null;
} catch (InterruptedException e) {
- LOGGER.warn(
- "{}: interrupted when processing write operation future with
exception {}", this, e);
Thread.currentThread().interrupt();
throw new IntoProcessException(e.getMessage());
} catch (ExecutionException e) {
@@ -249,7 +249,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
/** Return true if write task is submitted successfully. */
protected boolean insertMultiTabletsInternally(boolean needCheck) {
- InsertMultiTabletsStatement insertMultiTabletsStatement =
+ final InsertMultiTabletsStatement insertMultiTabletsStatement =
constructInsertMultiTabletsStatement(needCheck);
if (insertMultiTabletsStatement == null) {
return false;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 0102c0be073..0b50b6c8ebb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
deleted file mode 100644
index cb7630dea2f..00000000000
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.execution.operator.process;
-
-import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import
org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.google.common.util.concurrent.Futures.successfulAsList;
-
-/**
- * DeviceMergeOperator is responsible for merging tsBlock coming from
DeviceViewOperators.
- *
- * <p>If the devices in different dataNodes are different, we need to output
tsBlocks of each node
- * in order of device. If the same device exists in different nodes, the
tsBlocks need to be merged
- * by time within the device.
- *
- * <p>The form of tsBlocks from input operators should be the same strictly,
which is transferred by
- * DeviceViewOperator.
- */
-@Deprecated
-public class DeviceMergeOperator implements ProcessOperator {
-
- private final OperatorContext operatorContext;
- private final List<String> devices;
- private final List<Operator> deviceOperators;
- private final List<TSDataType> dataTypes;
- private final TsBlockBuilder tsBlockBuilder;
-
- private final int inputOperatorsCount;
- private final TsBlock[] inputTsBlocks;
- // device name of inputTsBlocks[], e.g. d1 in tsBlock1, d2 in tsBlock2
- private final String[] deviceOfInputTsBlocks;
- private final boolean[] noMoreTsBlocks;
-
- private int curDeviceIndex;
- // the index of curDevice in inputTsBlocks
- private LinkedList<Integer> curDeviceTsBlockIndexList = new LinkedList<>();
-
- private boolean finished;
-
- private final TimeSelector timeSelector;
- private final TimeComparator comparator;
-
- public DeviceMergeOperator(
- OperatorContext operatorContext,
- List<String> devices,
- List<Operator> deviceOperators,
- List<TSDataType> dataTypes,
- TimeSelector selector,
- TimeComparator comparator) {
- this.operatorContext = operatorContext;
- this.devices = devices;
- this.deviceOperators = deviceOperators;
- this.inputOperatorsCount = deviceOperators.size();
- this.inputTsBlocks = new TsBlock[inputOperatorsCount];
- this.deviceOfInputTsBlocks = new String[inputOperatorsCount];
- this.noMoreTsBlocks = new boolean[inputOperatorsCount];
- this.dataTypes = dataTypes;
- this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.timeSelector = selector;
- this.comparator = comparator;
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public ListenableFuture<?> isBlocked() {
- List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
- ListenableFuture<?> blocked = deviceOperators.get(i).isBlocked();
- if (!blocked.isDone()) {
- listenableFutures.add(blocked);
- }
- }
- }
- return listenableFutures.isEmpty() ? NOT_BLOCKED :
successfulAsList(listenableFutures);
- }
-
- @Override
- public TsBlock next() throws Exception {
- // get new input TsBlock
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) &&
deviceOperators.get(i).hasNextWithTimer()) {
- inputTsBlocks[i] = deviceOperators.get(i).nextWithTimer();
- if (inputTsBlocks[i] == null || inputTsBlocks[i].isEmpty()) {
- return null;
- }
- deviceOfInputTsBlocks[i] = getDeviceNameFromTsBlock(inputTsBlocks[i]);
- tryToAddCurDeviceTsBlockList(i);
- }
- }
- // move to next device
- while (curDeviceTsBlockIndexList.isEmpty() && curDeviceIndex + 1 <
devices.size()) {
- getNextDeviceTsBlocks();
- }
- // process the curDeviceTsBlocks
- if (curDeviceTsBlockIndexList.size() == 1) {
- TsBlock resultTsBlock = inputTsBlocks[curDeviceTsBlockIndexList.get(0)];
- inputTsBlocks[curDeviceTsBlockIndexList.get(0)] = null;
- curDeviceTsBlockIndexList.clear();
- return resultTsBlock;
- } else {
- tsBlockBuilder.reset();
- int tsBlockSizeOfCurDevice = curDeviceTsBlockIndexList.size();
- TsBlock[] deviceTsBlocks = new TsBlock[tsBlockSizeOfCurDevice];
- TsBlockSingleColumnIterator[] tsBlockIterators =
- new TsBlockSingleColumnIterator[tsBlockSizeOfCurDevice];
- for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
- deviceTsBlocks[i] = inputTsBlocks[curDeviceTsBlockIndexList.get(i)];
- tsBlockIterators[i] =
deviceTsBlocks[i].getTsBlockSingleColumnIterator();
- }
- // Use the min end time of all tsBlocks as the end time of result tsBlock
- // i.e. only one tsBlock will be consumed totally
- long currentEndTime = deviceTsBlocks[0].getEndTime();
- for (int i = 1; i < tsBlockSizeOfCurDevice; i++) {
- currentEndTime =
- comparator.getCurrentEndTime(currentEndTime,
deviceTsBlocks[i].getEndTime());
- }
-
- TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
- ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
- while (!timeSelector.isEmpty()
- && comparator.satisfyCurEndTime(timeSelector.first(),
currentEndTime)) {
- long timestamp = timeSelector.pollFirst();
- timeBuilder.writeLong(timestamp);
- // TODO process by column
- // Try to find the tsBlock that timestamp belongs to
- for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
- // TODO the same timestamp in different data region
- if (tsBlockIterators[i].hasNext() &&
tsBlockIterators[i].currentTime() == timestamp) {
- int rowIndex = tsBlockIterators[i].getRowIndex();
- for (int j = 0; j < valueColumnBuilders.length; j++) {
- // the jth column of rowIndex of ith tsBlock
- if (deviceTsBlocks[i].getColumn(j).isNull(rowIndex)) {
- valueColumnBuilders[j].appendNull();
- continue;
- }
- valueColumnBuilders[j].write(deviceTsBlocks[i].getColumn(j),
rowIndex);
- }
- tsBlockIterators[i].next();
- break;
- }
- }
- tsBlockBuilder.declarePosition();
- }
- // update tsBlock after consuming
- int consumedTsBlockIndex = 0;
- for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
- if (tsBlockIterators[i].hasNext()) {
- int rowIndex = tsBlockIterators[i].getRowIndex();
- inputTsBlocks[curDeviceTsBlockIndexList.get(i)] =
-
inputTsBlocks[curDeviceTsBlockIndexList.get(i)].subTsBlock(rowIndex);
- } else {
- inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = null;
- consumedTsBlockIndex = i;
- }
- }
- curDeviceTsBlockIndexList.remove(consumedTsBlockIndex);
- return tsBlockBuilder.build();
- }
- }
-
- @Override
- public boolean hasNext() throws Exception {
- if (finished) {
- return false;
- }
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (!isTsBlockEmpty(i)) {
- return true;
- } else if (!noMoreTsBlocks[i]) {
- if (deviceOperators.get(i).hasNextWithTimer()) {
- return true;
- } else {
- noMoreTsBlocks[i] = true;
- inputTsBlocks[i] = null;
- }
- }
- }
- return false;
- }
-
- @Override
- public void close() throws Exception {
- for (Operator deviceOperator : deviceOperators) {
- deviceOperator.close();
- }
- }
-
- @Override
- public boolean isFinished() throws Exception {
- if (finished) {
- return true;
- }
- finished = true;
-
- for (int i = 0; i < inputOperatorsCount; i++) {
- // has more tsBlock output from children[i] or has cached tsBlock in
inputTsBlocks[i]
- if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) {
- finished = false;
- break;
- }
- }
- return finished;
- }
-
- /** DeviceColumn must be the first value column of tsBlock transferred by
DeviceViewOperator. */
- private String getDeviceNameFromTsBlock(TsBlock tsBlock) {
- if (tsBlock == null || tsBlock.getPositionCount() == 0 ||
tsBlock.getColumn(0).isNull(0)) {
- return null;
- }
- return tsBlock.getColumn(0).getBinary(0).toString();
- }
-
- private String getCurDeviceName() {
- return devices.get(curDeviceIndex);
- }
-
- private void getNextDeviceTsBlocks() {
- curDeviceIndex++;
- for (int i = 0; i < inputOperatorsCount; i++) {
- tryToAddCurDeviceTsBlockList(i);
- }
- }
-
- private void tryToAddCurDeviceTsBlockList(int tsBlockIndex) {
- if (deviceOfInputTsBlocks[tsBlockIndex] != null
- && deviceOfInputTsBlocks[tsBlockIndex].equals(getCurDeviceName())) {
- // add tsBlock of curDevice to a list
- curDeviceTsBlockIndexList.add(tsBlockIndex);
- // add all timestamp of curDevice to timeSelector
- int rowSize = inputTsBlocks[tsBlockIndex].getPositionCount();
- for (int row = 0; row < rowSize; row++) {
- timeSelector.add(inputTsBlocks[tsBlockIndex].getTimeByIndex(row));
- }
- }
- }
-
- /**
- * If the tsBlock of tsBlockIndex is null or has no more data in the
tsBlock, return true; else
- * return false;
- */
- private boolean isTsBlockEmpty(int tsBlockIndex) {
- return inputTsBlocks[tsBlockIndex] == null
- || inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- // timeSelector will cache time, we use a single time column to represent
max memory cost
- long maxPeekMemory =
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- // inputTsBlocks will cache all TsBlocks returned by deviceOperators
- for (Operator operator : deviceOperators) {
- maxPeekMemory += operator.calculateMaxReturnSize();
- maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
- }
- for (Operator operator : deviceOperators) {
- maxPeekMemory = Math.max(maxPeekMemory,
operator.calculateMaxPeekMemory());
- }
- return Math.max(maxPeekMemory, calculateMaxReturnSize());
- }
-
- @Override
- public long calculateMaxReturnSize() {
- // time + all value columns
- return (1L + dataTypes.size()) *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
- for (Operator child : deviceOperators) {
- long maxReturnSize = child.calculateMaxReturnSize();
- currentRetainedSize += (maxReturnSize +
child.calculateRetainedSizeAfterCallingNext());
- minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
- }
- // max cached TsBlock
- return currentRetainedSize - minChildReturnSize;
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 32a0959d80f..95f7c4ce5fe 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -54,6 +54,7 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
private final TsBlockBuilder resultTsBlockBuilder;
+ @SuppressWarnings("squid:S107")
public DeviceViewIntoOperator(
OperatorContext operatorContext,
Operator child,
@@ -96,7 +97,7 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
String device =
String.valueOf(inputTsBlock.getValueColumns()[deviceColumnIndex].getBinary(0));
if (!Objects.equals(device, currentDevice)) {
- InsertMultiTabletsStatement insertMultiTabletsStatement =
+ final InsertMultiTabletsStatement insertMultiTabletsStatement =
constructInsertMultiTabletsStatement(false);
updateResultTsBlock();
@@ -113,7 +114,7 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
int readIndex = 0;
while (readIndex < inputTsBlock.getPositionCount()) {
int lastReadIndex = readIndex;
- for (IntoOperator.InsertTabletStatementGenerator generator :
+ for (AbstractIntoOperator.InsertTabletStatementGenerator generator :
insertTabletStatementGenerators) {
lastReadIndex = Math.max(lastReadIndex,
generator.processTsBlock(inputTsBlock, readIndex));
}
@@ -142,7 +143,7 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
return resultTsBlockBuilder.build();
}
- private List<IntoOperator.InsertTabletStatementGenerator>
+ private List<AbstractIntoOperator.InsertTabletStatementGenerator>
constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
Map<PartialPath, Map<String, InputLocation>>
targetPathToSourceInputLocationMap =
deviceToTargetPathSourceInputLocationMap.get(currentDevice);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 384163d1395..50052c798dc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index c88ab34483f..db9f6d6c55d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -29,7 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-/** Used for previous and constant value fill */
+/** Used for previous and constant value fill. */
public class FillOperator implements ProcessOperator {
private final OperatorContext operatorContext;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index 232b6c08cc6..6e8cc0fe096 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -48,15 +48,15 @@ public class FilterAndProjectOperator implements
ProcessOperator {
private final Operator inputOperator;
- private List<LeafColumnTransformer> filterLeafColumnTransformerList;
+ private final List<LeafColumnTransformer> filterLeafColumnTransformerList;
- private ColumnTransformer filterOutputTransformer;
+ private final ColumnTransformer filterOutputTransformer;
- private List<ColumnTransformer> commonTransformerList;
+ private final List<ColumnTransformer> commonTransformerList;
- private List<LeafColumnTransformer> projectLeafColumnTransformerList;
+ private final List<LeafColumnTransformer> projectLeafColumnTransformerList;
- private List<ColumnTransformer> projectOutputTransformerList;
+ private final List<ColumnTransformer> projectOutputTransformerList;
private final TsBlockBuilder filterTsBlockBuilder;
@@ -67,6 +67,7 @@ public class FilterAndProjectOperator implements
ProcessOperator {
// false when we only need to do projection
private final boolean hasFilter;
+ @SuppressWarnings("squid:S107")
public FilterAndProjectOperator(
OperatorContext operatorContext,
Operator inputOperator,
@@ -117,10 +118,7 @@ public class FilterAndProjectOperator implements
ProcessOperator {
/**
* Return the TsBlock that contains both initial input columns and columns
of common
- * subexpressions after filtering
- *
- * @param input
- * @return
+ * subexpressions after filtering.
*/
private TsBlock getFilterTsBlock(TsBlock input) {
final TimeColumn originTimeColumn = input.getTimeColumn();
@@ -145,7 +143,6 @@ public class FilterAndProjectOperator implements
ProcessOperator {
resultColumns.add(input.getColumn(i));
}
- // todo: remove this if, add calculated common sub expressions anyway
if (!hasNonMappableUDF) {
// get result of calculated common sub expressions
for (ColumnTransformer columnTransformer : commonTransformerList) {
@@ -153,12 +150,32 @@ public class FilterAndProjectOperator implements
ProcessOperator {
}
}
+ int rowCount =
+ constructFilteredTsBlock(
+ resultColumns,
+ timeBuilder,
+ filterColumn,
+ originTimeColumn,
+ columnBuilders,
+ positionCount);
+
+ filterTsBlockBuilder.declarePositions(rowCount);
+ return filterTsBlockBuilder.build();
+ }
+
+ private int constructFilteredTsBlock(
+ List<Column> resultColumns,
+ TimeColumnBuilder timeBuilder,
+ Column filterColumn,
+ TimeColumn originTimeColumn,
+ ColumnBuilder[] columnBuilders,
+ int positionCount) {
// construct result TsBlock of filter
int rowCount = 0;
for (int i = 0, n = resultColumns.size(); i < n; i++) {
Column curColumn = resultColumns.get(i);
for (int j = 0; j < positionCount; j++) {
- if (!filterColumn.isNull(j) && filterColumn.getBoolean(j)) {
+ if (satisfy(filterColumn, j)) {
if (i == 0) {
rowCount++;
timeBuilder.writeLong(originTimeColumn.getLong(j));
@@ -171,9 +188,11 @@ public class FilterAndProjectOperator implements
ProcessOperator {
}
}
}
+ return rowCount;
+ }
- filterTsBlockBuilder.declarePositions(rowCount);
- return filterTsBlockBuilder.build();
+ private boolean satisfy(Column filterColumn, int rowIndex) {
+ return !filterColumn.isNull(rowIndex) && filterColumn.getBoolean(rowIndex);
}
private TsBlock getTransformedTsBlock(TsBlock input) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index d5e65d262ed..c58f3f9a1ae 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -42,6 +42,7 @@ public class IntoOperator extends AbstractIntoOperator {
private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
+ @SuppressWarnings("squid:S107")
public IntoOperator(
OperatorContext operatorContext,
Operator child,
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index b208e551b1e..8407f51fce4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 2d785311a6e..8ffc0489c5e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -33,14 +34,13 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-/** Used for linear fill */
+/** Used for linear fill. */
public class LinearFillOperator implements ProcessOperator {
private final OperatorContext operatorContext;
private final ILinearFill[] fillArray;
private final Operator child;
private final int outputColumnCount;
- // TODO need to spill it to disk if it consumes too much memory
private final List<TsBlock> cachedTsBlock;
private final List<Long> cachedRowIndex;
@@ -49,8 +49,10 @@ public class LinearFillOperator implements ProcessOperator {
// next TsBlock Index for each Column
private final int[] nextTsBlockIndex;
- // indicate whether we can call child.next()
- // it's used to make sure that child.next() will only be called once in
LinearFillOperator.next();
+ /**
+ * indicate whether we can call child.next(). it's used to make sure that
child.next() will only
+ * be called once in LinearFillOperator.next().
+ */
private boolean canCallNext;
// indicate whether there is more TsBlock for child operator
private boolean noMoreTsBlock;
@@ -81,6 +83,7 @@ public class LinearFillOperator implements ProcessOperator {
return child.isBlocked();
}
+ @SuppressWarnings("squid:S3776")
@Override
public TsBlock next() throws Exception {
@@ -181,7 +184,7 @@ public class LinearFillOperator implements ProcessOperator {
}
/**
- * Judge whether we can use current cached TsBlock to fill Column
+ * Judge whether we can use current cached TsBlock to fill Column.
*
* @param columnIndex index for column which need to be filled
* @param currentEndRowIndex row index for endTime of column which need to
be filled
@@ -206,7 +209,10 @@ public class LinearFillOperator implements ProcessOperator
{
}
/**
+ * Try to get next TsBlock
+ *
* @return true if we succeed to get next TsBlock and add it into
cachedTsBlock, otherwise false
+ * @throws Exception errors happened while getting next batch data
*/
private boolean tryToGetNextTsBlock() throws Exception {
if (canCallNext) { // if we can call child.next(), we call that and cache
it in
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index d13798a5176..a11df165ed2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -83,6 +84,7 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
: successfulAsList(listenableFutures);
}
+ @SuppressWarnings({"squid:S3776", "squid:S135"})
@Override
public TsBlock next() throws Exception {
// start stopwatch
@@ -204,7 +206,8 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
@Override
public long calculateRetainedSizeAfterCallingNext() {
- long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ long currentRetainedSize = 0;
+ long minChildReturnSize = Long.MAX_VALUE;
for (Operator child : children) {
long maxReturnSize = child.calculateMaxReturnSize();
minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
@@ -223,7 +226,7 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
protected boolean prepareInput() throws Exception {
boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
- if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
+ if (needCallNext(i)) {
continue;
}
if (canCallNext[i]) {
@@ -245,4 +248,8 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
}
return allReady;
}
+
+ private boolean needCallNext(int i) {
+ return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 22429b431e2..af9acc47e33 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
index 6205cb766d0..d6be0d4d179 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
-// TODO should think about what interfaces should this ProcessOperator have
public interface ProcessOperator extends Operator {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index ed765d9b478..7c6dc64dd63 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -84,6 +84,7 @@ public class RawDataAggregationOperator extends
SingleInputAggregationOperator {
return windowManager.hasNext(hasMoreData());
}
+ @SuppressWarnings({"squid:S3776", "squid:S135"})
@Override
protected boolean calculateNextAggregationResult() throws Exception {
@@ -110,7 +111,9 @@ public class RawDataAggregationOperator extends
SingleInputAggregationOperator {
// If the window is not initialized, it just returns to avoid invoking
// updateResultTsBlock()
// but if it's skipping the last window, just break and keep skipping.
- if (needSkip || windowManager.isCurWindowInit()) break;
+ if (needSkip || windowManager.isCurWindowInit()) {
+ break;
+ }
return false;
}
}
@@ -138,6 +141,7 @@ public class RawDataAggregationOperator extends
SingleInputAggregationOperator {
return true;
}
+ @SuppressWarnings({"squid:S3776", "squid:S135"})
private boolean calculateFromRawData() {
// if window is not initialized, we should init window status and reset
aggregators
if (!windowManager.isCurWindowInit() &&
!skipPreviousWindowAndInitCurWindow()) {
@@ -179,7 +183,9 @@ public class RawDataAggregationOperator extends
SingleInputAggregationOperator {
}
// if no row needs to skip, just send a null parameter.
- if (!hasSkip) needProcess = null;
+ if (!hasSkip) {
+ needProcess = null;
+ }
for (Aggregator aggregator : aggregators) {
// Current agg method has been calculated
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 40938e335ed..302e6ff34aa 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -106,6 +106,7 @@ public abstract class SingleInputAggregationOperator
implements ProcessOperator
child.close();
}
+ @SuppressWarnings("squid:S112")
protected abstract boolean calculateNextAggregationResult() throws Exception;
protected abstract void updateResultTsBlock();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 3e8871da00d..e417073d8c1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -75,6 +75,7 @@ public class SlidingWindowAggregationOperator extends
SingleInputAggregationOper
return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
}
+ @SuppressWarnings("squid:S112")
@Override
protected boolean calculateNextAggregationResult() throws Exception {
if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
@@ -110,15 +111,16 @@ public class SlidingWindowAggregationOperator extends
SingleInputAggregationOper
return true;
}
- /** @return if already get the result */
+ /** return if already get the result. */
private boolean isCalculationDone() {
- if (curSubTimeRange == null && !subTimeRangeIterator.hasNextTimeRange()) {
- return true;
+ if (curSubTimeRange == null) {
+ if (!subTimeRangeIterator.hasNextTimeRange()) {
+ return true;
+ } else {
+ curSubTimeRange = subTimeRangeIterator.nextTimeRange();
+ }
}
- if (curSubTimeRange == null && subTimeRangeIterator.hasNextTimeRange()) {
- curSubTimeRange = subTimeRangeIterator.nextTimeRange();
- }
return ascending
? curSubTimeRange.getMin() > curTimeRange.getMax()
: curSubTimeRange.getMax() < curTimeRange.getMin();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 20dcac02f7e..328356029ef 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.commons.exception.IoTDBException;
@@ -135,7 +136,9 @@ public class SortOperator implements ProcessOperator {
}
private void prepareSortReaders() throws IoTDBException {
- if (sortReaders != null) return;
+ if (sortReaders != null) {
+ return;
+ }
sortReaders = new ArrayList<>();
if (cachedBytes != 0) {
@@ -208,21 +211,8 @@ public class SortOperator implements ProcessOperator {
private TsBlock mergeSort() throws IoTDBException {
- if (mergeSortHeap == null) {
- mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
- // 1. fill the input from each reader
- for (int i = 0; i < sortReaders.size(); i++) {
- SortReader sortReader = sortReaders.get(i);
- if (sortReader.hasNext()) {
- MergeSortKey mergeSortKey = sortReader.next();
- mergeSortKey.inputChannelIndex = i;
- mergeSortHeap.push(mergeSortKey);
- } else {
- noMoreData[i] = true;
- sortBufferManager.releaseOneSortBranch();
- }
- }
- }
+ // 1. fill the input from each reader
+ initMergeSortHeap();
long startTime = System.nanoTime();
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
@@ -262,6 +252,23 @@ public class SortOperator implements ProcessOperator {
return tsBlockBuilder.build();
}
+ private void initMergeSortHeap() throws IoTDBException {
+ if (mergeSortHeap == null) {
+ mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
+ for (int i = 0; i < sortReaders.size(); i++) {
+ SortReader sortReader = sortReaders.get(i);
+ if (sortReader.hasNext()) {
+ MergeSortKey mergeSortKey = sortReader.next();
+ mergeSortKey.inputChannelIndex = i;
+ mergeSortHeap.push(mergeSortKey);
+ } else {
+ noMoreData[i] = true;
+ sortBufferManager.releaseOneSortBranch();
+ }
+ }
+ }
+ }
+
private MergeSortKey readNextMergeSortKey(int readerIndex) throws
IoTDBException {
SortReader sortReader = sortReaders.get(readerIndex);
if (sortReader.hasNext()) {
@@ -273,7 +280,9 @@ public class SortOperator implements ProcessOperator {
}
private boolean hasMoreData() {
- if (noMoreData == null) return true;
+ if (noMoreData == null) {
+ return true;
+ }
for (boolean noMore : noMoreData) {
if (!noMore) {
return true;
@@ -283,7 +292,9 @@ public class SortOperator implements ProcessOperator {
}
public void clear() {
- if (!diskSpiller.hasSpilledData()) return;
+ if (!diskSpiller.hasSpilledData()) {
+ return;
+ }
try {
if (sortReaders != null) {
for (SortReader sortReader : sortReaders) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index 0360028c2bb..299f637eb9e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -57,7 +57,7 @@ public class TagAggregationOperator extends
AbstractConsumeAllOperator {
this.groups = Validate.notNull(groups);
this.groupedAggregators = Validate.notNull(groupedAggregators);
List<TSDataType> actualOutputColumnTypes = new ArrayList<>();
- for (String ignored : groups.get(0)) {
+ for (int i = 0; i < groups.get(0).size(); i++) {
actualOutputColumnTypes.add(TSDataType.TEXT);
}
for (int outputColumnIdx = 0;
@@ -104,44 +104,50 @@ public class TagAggregationOperator extends
AbstractConsumeAllOperator {
rowBlocks[i] = inputTsBlocks[i].getRegion(consumedIndices[i], 1);
}
for (int groupIdx = 0; groupIdx < groups.size(); groupIdx++) {
- List<String> group = groups.get(groupIdx);
List<Aggregator> aggregators = groupedAggregators.get(groupIdx);
+ aggregate(aggregators, rowBlocks);
+ List<String> group = groups.get(groupIdx);
+ appendOneRow(rowBlocks, group, aggregators);
+ }
- for (Aggregator aggregator : aggregators) {
- if (aggregator == null) {
- continue;
- }
- aggregator.reset();
- aggregator.processTsBlocks(rowBlocks);
+ // Reset dataReady for next iteration
+ for (int i = 0; i < children.size(); i++) {
+ consumedIndices[i]++;
+ }
+ }
+
+ private void aggregate(List<Aggregator> aggregators, TsBlock[] rowBlocks) {
+ for (Aggregator aggregator : aggregators) {
+ if (aggregator == null) {
+ continue;
}
+ aggregator.reset();
+ aggregator.processTsBlocks(rowBlocks);
+ }
+ }
- TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
- timeColumnBuilder.writeLong(rowBlocks[0].getStartTime());
- ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ private void appendOneRow(TsBlock[] rowBlocks, List<String> group,
List<Aggregator> aggregators) {
+ TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(rowBlocks[0].getStartTime());
+ ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
- for (int i = 0; i < group.size(); i++) {
- if (group.get(i) == null) {
- columnBuilders[i].writeBinary(new Binary("NULL"));
- } else {
- columnBuilders[i].writeBinary(new Binary(group.get(i)));
- }
- }
- for (int i = 0; i < aggregators.size(); i++) {
- Aggregator aggregator = aggregators.get(i);
- ColumnBuilder columnBuilder = columnBuilders[i + group.size()];
- if (aggregator == null) {
- columnBuilder.appendNull();
- } else {
- aggregator.outputResult(new ColumnBuilder[] {columnBuilder});
- }
+ for (int i = 0; i < group.size(); i++) {
+ if (group.get(i) == null) {
+ columnBuilders[i].writeBinary(new Binary("NULL"));
+ } else {
+ columnBuilders[i].writeBinary(new Binary(group.get(i)));
}
- tsBlockBuilder.declarePosition();
}
-
- // Reset dataReady for next iteration
- for (int i = 0; i < children.size(); i++) {
- consumedIndices[i]++;
+ for (int i = 0; i < aggregators.size(); i++) {
+ Aggregator aggregator = aggregators.get(i);
+ ColumnBuilder columnBuilder = columnBuilders[i + group.size()];
+ if (aggregator == null) {
+ columnBuilder.appendNull();
+ } else {
+ aggregator.outputResult(new ColumnBuilder[] {columnBuilder});
+ }
}
+ tsBlockBuilder.declarePosition();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index f5c056a3e4f..2331f1e7daf 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -44,10 +44,7 @@ import
org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -56,8 +53,6 @@ import java.util.Map;
public class TransformOperator implements ProcessOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TransformOperator.class);
-
protected final float udfReaderMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
protected final float udfTransformerMemoryBudgetInMB =
@@ -79,6 +74,7 @@ public class TransformOperator implements ProcessOperator {
private final String udtfQueryId;
+ @SuppressWarnings("squid:S107")
public TransformOperator(
OperatorContext operatorContext,
Operator inputOperator,
@@ -89,7 +85,7 @@ public class TransformOperator implements ProcessOperator {
ZoneId zoneId,
Map<NodeRef<Expression>, TSDataType> expressionTypes,
boolean isAscending)
- throws QueryProcessException, IOException {
+ throws QueryProcessException {
this.operatorContext = operatorContext;
this.inputOperator = inputOperator;
this.keepNull = keepNull;
@@ -158,6 +154,7 @@ public class TransformOperator implements ProcessOperator {
return YieldableState.YIELDABLE;
}
+ @SuppressWarnings("squid:S135")
protected YieldableState iterateReaderToNextValid(LayerPointReader reader)
throws Exception {
// Since a constant operand is not allowed to be a result column, the
reader will not be
// a ConstantLayerPointReader.
@@ -174,6 +171,7 @@ public class TransformOperator implements ProcessOperator {
return yieldableState;
}
+ @SuppressWarnings("squid:S112")
@Override
public final boolean hasNext() throws Exception {
if (!timeHeap.isEmpty()) {
@@ -184,12 +182,12 @@ public class TransformOperator implements ProcessOperator
{
return true;
}
} catch (Exception e) {
- LOGGER.error("TransformOperator#hasNext()", e);
throw new RuntimeException(e);
}
return !timeHeap.isEmpty();
}
+ @SuppressWarnings("squid:S112")
@Override
public TsBlock next() throws Exception {
@@ -200,13 +198,7 @@ public class TransformOperator implements ProcessOperator {
}
final TsBlockBuilder tsBlockBuilder =
TsBlockBuilder.createWithOnlyTimeColumn();
- if (outputDataTypes == null) {
- outputDataTypes = new ArrayList<>();
- for (LayerPointReader reader : transformers) {
- outputDataTypes.add(reader.getDataType());
- }
- }
- tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
+ prepareTsBlockBuilder(tsBlockBuilder);
final TimeColumnBuilder timeBuilder =
tsBlockBuilder.getTimeColumnBuilder();
final ColumnBuilder[] columnBuilders =
tsBlockBuilder.getValueColumnBuilders();
final int columnCount = columnBuilders.length;
@@ -232,11 +224,7 @@ public class TransformOperator implements ProcessOperator {
}
}
- for (int i = 0; i < columnCount; ++i) {
- if (shouldIterateReadersToNextValid[i]) {
- transformers[i].readyForNext();
- }
- }
+ prepareEachColumn(columnCount);
++rowCount;
@@ -252,11 +240,28 @@ public class TransformOperator implements ProcessOperator
{
tsBlockBuilder.declarePositions(rowCount);
return tsBlockBuilder.build();
} catch (Exception e) {
- LOGGER.error("TransformOperator#next()", e);
throw new RuntimeException(e);
}
}
+ protected void prepareTsBlockBuilder(TsBlockBuilder tsBlockBuilder) {
+ if (outputDataTypes == null) {
+ outputDataTypes = new ArrayList<>();
+ for (LayerPointReader reader : transformers) {
+ outputDataTypes.add(reader.getDataType());
+ }
+ }
+ tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
+ }
+
+ private void prepareEachColumn(int columnCount) {
+ for (int i = 0; i < columnCount; ++i) {
+ if (shouldIterateReadersToNextValid[i]) {
+ transformers[i].readyForNext();
+ }
+ }
+ }
+
protected boolean collectReaderAppendIsNull(LayerPointReader reader, long
currentTime)
throws Exception {
final YieldableState yieldableState = reader.yield();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index 883dadfd5f4..0458572f4d8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -61,26 +61,31 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
isAligned = true;
}
+ @SuppressWarnings("squid:S3740")
@Override
- protected Statistics currentFileStatistics(int index) throws IOException {
+ protected Statistics currentFileStatistics(int index) {
return ((AlignedTimeSeriesMetadata)
firstTimeSeriesMetadata).getStatistics(index);
}
+ @SuppressWarnings("squid:S3740")
@Override
- protected Statistics currentFileTimeStatistics() throws IOException {
+ protected Statistics currentFileTimeStatistics() {
return ((AlignedTimeSeriesMetadata)
firstTimeSeriesMetadata).getTimeStatistics();
}
+ @SuppressWarnings("squid:S3740")
@Override
- protected Statistics currentChunkStatistics(int index) throws IOException {
+ protected Statistics currentChunkStatistics(int index) {
return ((AlignedChunkMetadata) firstChunkMetadata).getStatistics(index);
}
+ @SuppressWarnings("squid:S3740")
@Override
protected Statistics currentChunkTimeStatistics() {
return ((AlignedChunkMetadata) firstChunkMetadata).getTimeStatistics();
}
+ @SuppressWarnings("squid:S3740")
@Override
protected Statistics currentPageStatistics(int index) throws IOException {
if (firstPageReader == null) {
@@ -89,6 +94,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
return firstPageReader.getStatistics(index);
}
+ @SuppressWarnings("squid:S3740")
@Override
protected Statistics currentPageTimeStatistics() throws IOException {
if (firstPageReader == null) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 728d88ee23a..5e3089c7623 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -70,6 +70,9 @@ import static
org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLO
public class SeriesScanUtil {
+ private static final String ONE_SENSOR_ERROR_MSG =
+ "Only one sensor in non-aligned SeriesScanUtil.";
+
private final QueryContext context;
// The path of the target series which will be scanned.
@@ -214,6 +217,7 @@ public class SeriesScanUtil {
return firstTimeSeriesMetadata != null;
}
+ @SuppressWarnings("squid:S3740")
boolean isFileOverlapped() throws IOException {
if (firstTimeSeriesMetadata == null) {
throw new IOException("no first file");
@@ -227,16 +231,19 @@ public class SeriesScanUtil {
fileStatistics,
unSeqTimeSeriesMetadata.peek().getStatistics());
}
+ @SuppressWarnings("squid:S3740")
Statistics currentFileStatistics() {
return firstTimeSeriesMetadata.getStatistics();
}
- protected Statistics currentFileStatistics(int index) throws IOException {
- checkArgument(index == 0, "Only one sensor in non-aligned
SeriesScanUtil.");
+ @SuppressWarnings("squid:S3740")
+ protected Statistics currentFileStatistics(int index) {
+ checkArgument(index == 0, ONE_SENSOR_ERROR_MSG);
return currentFileStatistics();
}
- protected Statistics currentFileTimeStatistics() throws IOException {
+ @SuppressWarnings("squid:S3740")
+ protected Statistics currentFileTimeStatistics() {
return currentFileStatistics();
}
@@ -287,6 +294,7 @@ public class SeriesScanUtil {
return firstChunkMetadata != null;
}
+ @SuppressWarnings("squid:S3740")
protected void filterFirstChunkMetadata() throws IOException {
if (firstChunkMetadata != null && !isChunkOverlapped() &&
!firstChunkMetadata.isModified()) {
Filter queryFilter = scanOptions.getQueryFilter();
@@ -353,8 +361,7 @@ public class SeriesScanUtil {
}
}
- protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata
timeSeriesMetadata)
- throws IOException {
+ protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata
timeSeriesMetadata) {
List<IChunkMetadata> chunkMetadataList =
FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata);
chunkMetadataList.forEach(chunkMetadata ->
chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
@@ -362,6 +369,7 @@ public class SeriesScanUtil {
cachedChunkMetadata.addAll(chunkMetadataList);
}
+ @SuppressWarnings("squid:S3740")
boolean isChunkOverlapped() throws IOException {
if (firstChunkMetadata == null) {
throw new IOException("no first chunk");
@@ -372,16 +380,19 @@ public class SeriesScanUtil {
&& orderUtils.isOverlapped(chunkStatistics,
cachedChunkMetadata.peek().getStatistics());
}
+ @SuppressWarnings("squid:S3740")
Statistics currentChunkStatistics() {
return firstChunkMetadata.getStatistics();
}
- protected Statistics currentChunkStatistics(int index) throws IOException {
- checkArgument(index == 0, "Only one sensor in non-aligned
SeriesScanUtil.");
+ @SuppressWarnings("squid:S3740")
+ protected Statistics currentChunkStatistics(int index) {
+ checkArgument(index == 0, ONE_SENSOR_ERROR_MSG);
return currentChunkStatistics();
}
- protected Statistics currentChunkTimeStatistics() throws IOException {
+ @SuppressWarnings("squid:S3740")
+ protected Statistics currentChunkTimeStatistics() {
return currentChunkStatistics();
}
@@ -567,6 +578,7 @@ public class SeriesScanUtil {
* <p>hasNextPage may cache firstPageReader if it is not overlapped or
cached a BatchData if the
* first page is overlapped
*/
+ @SuppressWarnings("squid:S3740")
boolean isPageOverlapped() throws IOException {
/*
@@ -595,6 +607,7 @@ public class SeriesScanUtil {
&& orderUtils.isOverlapped(firstPageStatistics,
unSeqPageReaders.peek().getStatistics());
}
+ @SuppressWarnings("squid:S3740")
Statistics currentPageStatistics() {
if (firstPageReader == null) {
return null;
@@ -602,11 +615,13 @@ public class SeriesScanUtil {
return firstPageReader.getStatistics();
}
+ @SuppressWarnings("squid:S3740")
protected Statistics currentPageStatistics(int index) throws IOException {
- checkArgument(index == 0, "Only one sensor in non-aligned
SeriesScanUtil.");
+ checkArgument(index == 0, ONE_SENSOR_ERROR_MSG);
return currentPageStatistics();
}
+ @SuppressWarnings("squid:S3740")
protected Statistics currentPageTimeStatistics() throws IOException {
return currentPageStatistics();
}
@@ -671,7 +686,6 @@ public class SeriesScanUtil {
// may has overlapped data
if (mergeReader.hasNextTimeValuePair()) {
- // TODO we still need to consider data type, ascending and
descending here
TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
@@ -1033,6 +1047,7 @@ public class SeriesScanUtil {
}
}
+ @SuppressWarnings("squid:S3740")
protected void filterFirstTimeSeriesMetadata() throws IOException {
if (firstTimeSeriesMetadata != null
&& !isFileOverlapped()
@@ -1132,10 +1147,12 @@ public class SeriesScanUtil {
this.isMem = data instanceof MemPageReader || data instanceof
MemAlignedPageReader;
}
+ @SuppressWarnings("squid:S3740")
Statistics getStatistics() {
return data.getStatistics();
}
+ @SuppressWarnings("squid:S3740")
Statistics getStatistics(int index) throws IOException {
if (!(data instanceof IAlignedPageReader)) {
throw new IOException("Can only get statistics by index from
AlignedPageReader");
@@ -1143,6 +1160,7 @@ public class SeriesScanUtil {
return ((IAlignedPageReader) data).getStatistics(index);
}
+ @SuppressWarnings("squid:S3740")
Statistics getTimeStatistics() throws IOException {
if (!(data instanceof IAlignedPageReader)) {
throw new IOException("Can only get statistics of time column from
AlignedPageReader");
@@ -1230,26 +1248,31 @@ public class SeriesScanUtil {
class DescTimeOrderUtils implements TimeOrderUtils {
+ @SuppressWarnings("squid:S3740")
@Override
public long getOrderTime(Statistics statistics) {
return statistics.getEndTime();
}
+ @SuppressWarnings("squid:S3740")
@Override
public long getOrderTime(TsFileResource fileResource) {
return fileResource.getEndTime(seriesPath.getDevice());
}
+ @SuppressWarnings("squid:S3740")
@Override
public long getOverlapCheckTime(Statistics range) {
return range.getStartTime();
}
+ @SuppressWarnings("squid:S3740")
@Override
public boolean isOverlapped(Statistics left, Statistics right) {
return left.getStartTime() <= right.getEndTime();
}
+ @SuppressWarnings("squid:S3740")
@Override
public boolean isOverlapped(long time, Statistics right) {
return time <= right.getEndTime();
@@ -1348,26 +1371,31 @@ public class SeriesScanUtil {
class AscTimeOrderUtils implements TimeOrderUtils {
+ @SuppressWarnings("squid:S3740")
@Override
public long getOrderTime(Statistics statistics) {
return statistics.getStartTime();
}
+ @SuppressWarnings("squid:S3740")
@Override
public long getOrderTime(TsFileResource fileResource) {
return fileResource.getStartTime(seriesPath.getDevice());
}
+ @SuppressWarnings("squid:S3740")
@Override
public long getOverlapCheckTime(Statistics range) {
return range.getEndTime();
}
+ @SuppressWarnings("squid:S3740")
@Override
public boolean isOverlapped(Statistics left, Statistics right) {
return left.getEndTime() >= right.getStartTime();
}
+ @SuppressWarnings("squid:S3740")
@Override
public boolean isOverlapped(long time, Statistics right) {
return time >= right.getStartTime();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 0c2d75b35f3..0ebf65484be 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -46,7 +46,6 @@ import
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewIntoOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
@@ -158,7 +157,6 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryS
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
@@ -212,7 +210,6 @@ import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -778,39 +775,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
operatorContext, node.getDevices(), children, deviceColumnIndex,
outputColumnTypes);
}
- @Deprecated
- @Override
- public Operator visitDeviceMerge(DeviceMergeNode node,
LocalExecutionPlanContext context) {
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- DeviceMergeOperator.class.getSimpleName());
- List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node,
context);
- List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
- TimeSelector selector = null;
- TimeComparator timeComparator = null;
- for (SortItem sortItem : node.getMergeOrderParameter().getSortItemList()) {
- if (Objects.equals(sortItem.getSortKey(), OrderByKey.TIME)) {
- Ordering ordering = sortItem.getOrdering();
- if (ordering == Ordering.ASC) {
- selector = new TimeSelector(node.getChildren().size() << 1, true);
- timeComparator = ASC_TIME_COMPARATOR;
- } else {
- selector = new TimeSelector(node.getChildren().size() << 1, false);
- timeComparator = DESC_TIME_COMPARATOR;
- }
- break;
- }
- }
-
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return new DeviceMergeOperator(
- operatorContext, node.getDevices(), children, dataTypes, selector,
timeComparator);
- }
-
@Override
public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext
context) {
OperatorContext operatorContext =
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index bea10d48463..2ca08105c1d 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
@@ -49,7 +48,6 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.join.HorizontallyConcatOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import
org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
@@ -88,7 +86,6 @@ import
org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditi
import
org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -557,52 +554,6 @@ public class OperatorMemoryTest {
assertEquals(512,
linearFillOperator.calculateRetainedSizeAfterCallingNext());
}
- @Test
- public void deviceMergeOperatorTest() {
- List<Operator> children = new ArrayList<>(4);
- List<TSDataType> dataTypeList = new ArrayList<>(2);
- dataTypeList.add(TSDataType.INT32);
- dataTypeList.add(TSDataType.INT32);
- List<String> devices = new ArrayList<>(4);
- devices.add("device1");
- devices.add("device2");
- devices.add("device3");
- devices.add("device4");
- long expectedMaxReturnSize =
- 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- long expectedMaxPeekMemory =
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- long expectedRetainedSizeAfterCallingNext = 0;
- long childrenMaxPeekMemory = 0;
-
- for (int i = 0; i < 4; i++) {
- Operator child = Mockito.mock(Operator.class);
- Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
- Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
-
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 *
1024L);
- expectedMaxPeekMemory += 128 * 1024L;
- childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory,
child.calculateMaxPeekMemory());
- expectedRetainedSizeAfterCallingNext += 128 * 1024L;
- children.add(child);
- }
-
- expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory,
childrenMaxPeekMemory);
-
- DeviceMergeOperator deviceMergeOperator =
- new DeviceMergeOperator(
- Mockito.mock(OperatorContext.class),
- devices,
- children,
- dataTypeList,
- Mockito.mock(TimeSelector.class),
- Mockito.mock(TimeComparator.class));
-
- assertEquals(expectedMaxPeekMemory,
deviceMergeOperator.calculateMaxPeekMemory());
- assertEquals(expectedMaxReturnSize,
deviceMergeOperator.calculateMaxReturnSize());
- assertEquals(
- expectedRetainedSizeAfterCallingNext - 64 * 1024L,
- deviceMergeOperator.calculateRetainedSizeAfterCallingNext());
- }
-
@Test
public void deviceViewOperatorTest() {
List<Operator> children = new ArrayList<>(4);