This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/sonar by this push:
new b6a87055f11
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last
done
b6a87055f11 is described below
commit b6a87055f118707f3bbcdfec9299505d84977182
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 21 08:58:13 2023 +0800
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last
done
---
.../last/AlignedUpdateLastCacheOperator.java | 2 +-
.../process/last/LastQueryCollectOperator.java | 1 +
.../process/last/LastQueryMergeOperator.java | 123 +++++++++++++--------
.../operator/process/last/LastQueryOperator.java | 1 +
.../process/last/LastQuerySortOperator.java | 52 ++++++---
.../operator/process/last/LastQueryUtil.java | 5 +
.../process/last/UpdateLastCacheOperator.java | 3 +-
7 files changed, 120 insertions(+), 67 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index 32c4260d1ea..0ac054023d0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import static org.weakref.jmx.internal.guava.base.Preconditions.checkArgument;
-/** update last cache for aligned series */
+/** update last cache for aligned series. */
public class AlignedUpdateLastCacheOperator extends
AbstractUpdateLastCacheOperator {
private final AlignedPath seriesPath;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 7e196646acd..53cfffee3ab 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index cb21ba62b07..557eac5bc5e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -55,7 +56,7 @@ public class LastQueryMergeOperator implements
ProcessOperator {
/** TsBlock from child operator. Only one cache now. */
private final TsBlock[] inputTsBlocks;
- /** start index for each input TsBlocks and size of it is equal to
inputTsBlocks */
+ /** start index for each input TsBlocks and size of it is equal to
inputTsBlocks. */
private final int[] inputIndex;
/**
@@ -117,51 +118,22 @@ public class LastQueryMergeOperator implements
ProcessOperator {
// among all the input TsBlock as the current output TsBlock's
endTimeSeries.
for (int i = 0; i < inputOperatorsCount; i++) {
Operator currentChild = children.get(i);
- if (!noMoreTsBlocks[i] && empty(i) && currentChild != null) {
- if (currentChild.hasNextWithTimer()) {
- inputIndex[i] = 0;
- inputTsBlocks[i] = currentChild.nextWithTimer();
- if (!empty(i)) {
- int rowSize = inputTsBlocks[i].getPositionCount();
- for (int row = 0; row < rowSize; row++) {
- Binary key = getTimeSeries(inputTsBlocks[i], row);
- Location location = timeSeriesSelector.get(key);
- if (location == null
- || inputTsBlocks[i].getTimeByIndex(row)
- >
inputTsBlocks[location.tsBlockIndex].getTimeByIndex(location.rowIndex)) {
- timeSeriesSelector.put(key, new Location(i, row));
- }
- }
- } else {
- // child operator has next but return an empty TsBlock which means
that it may not
- // finish calculation in given time slice.
- // In such case, LastQueryMergeOperator can't go on calculating,
so we just return
- // null.
- // We can also use the while loop here to continuously call the
hasNext() and next()
- // methods of the child operator until its hasNext() returns false
or the next() gets
- // the data that is not empty, but this will cause the execution
time of the while
- // loop
- // to be uncontrollable and may exceed all allocated time slice
- return null;
- }
- } else { // no more tsBlock
- noMoreTsBlocks[i] = true;
- inputTsBlocks[i] = null;
- currentChild.close();
- children.set(i, null);
- }
+ if (needCallNext(i, currentChild) && prepareNext(i, currentChild)) {
+ return null;
}
// update the currentEndTimeSeries if the TsBlock is not empty
if (!empty(i)) {
Binary endTimeSeries =
getTimeSeries(inputTsBlocks[i],
inputTsBlocks[i].getPositionCount() - 1);
- currentEndTimeSeries =
- init
- ? (comparator.compare(currentEndTimeSeries, endTimeSeries) < 0
- ? currentEndTimeSeries
- : endTimeSeries)
- : endTimeSeries;
- init = true;
+ if (init) {
+ currentEndTimeSeries =
+ comparator.compare(currentEndTimeSeries, endTimeSeries) < 0
+ ? currentEndTimeSeries
+ : endTimeSeries;
+ } else {
+ currentEndTimeSeries = endTimeSeries;
+ init = true;
+ }
}
}
@@ -171,11 +143,7 @@ public class LastQueryMergeOperator implements
ProcessOperator {
return res;
}
- while (!timeSeriesSelector.isEmpty()
- && (comparator.compare(timeSeriesSelector.firstKey(),
currentEndTimeSeries) <= 0)) {
- Location location = timeSeriesSelector.pollFirstEntry().getValue();
- appendLastValue(tsBlockBuilder, inputTsBlocks[location.tsBlockIndex],
location.rowIndex);
- }
+ calcCurrentBatch(currentEndTimeSeries);
clearTsBlockCache(currentEndTimeSeries);
@@ -184,6 +152,64 @@ public class LastQueryMergeOperator implements
ProcessOperator {
return res;
}
+ private boolean needCallNext(int i, Operator currentChild) {
+ return !noMoreTsBlocks[i] && empty(i) && currentChild != null;
+ }
+
+ /**
+ * prepare next batch
+ *
+ * @return true if need to break current loop, otherwise false
+ * @throws Exception errors happened while fetching next batch data
+ */
+ private boolean prepareNext(int i, Operator currentChild) throws Exception {
+ if (currentChild.hasNextWithTimer()) {
+ inputIndex[i] = 0;
+ inputTsBlocks[i] = currentChild.nextWithTimer();
+ if (!empty(i)) {
+ collectTimeSeries(i);
+ } else {
+ // child operator has next but return an empty TsBlock which means
that it may not
+ // finish calculation in given time slice.
+ // In such case, LastQueryMergeOperator can't go on calculating, so we
just return
+ // null.
+ // We can also use the while loop here to continuously call the
hasNext() and next()
+ // methods of the child operator until its hasNext() returns false or
the next() gets
+ // the data that is not empty, but this will cause the execution time
of the while
+ // loop
+ // to be uncontrollable and may exceed all allocated time slice
+ return true;
+ }
+ } else { // no more tsBlock
+ noMoreTsBlocks[i] = true;
+ inputTsBlocks[i] = null;
+ currentChild.close();
+ children.set(i, null);
+ }
+ return false;
+ }
+
+ private void collectTimeSeries(int i) {
+ int rowSize = inputTsBlocks[i].getPositionCount();
+ for (int row = 0; row < rowSize; row++) {
+ Binary key = getTimeSeries(inputTsBlocks[i], row);
+ Location location = timeSeriesSelector.get(key);
+ if (location == null
+ || inputTsBlocks[i].getTimeByIndex(row)
+ >
inputTsBlocks[location.tsBlockIndex].getTimeByIndex(location.rowIndex)) {
+ timeSeriesSelector.put(key, new Location(i, row));
+ }
+ }
+ }
+
+ private void calcCurrentBatch(Binary currentEndTimeSeries) {
+ while (!timeSeriesSelector.isEmpty()
+ && (comparator.compare(timeSeriesSelector.firstKey(),
currentEndTimeSeries) <= 0)) {
+ Location location = timeSeriesSelector.pollFirstEntry().getValue();
+ appendLastValue(tsBlockBuilder, inputTsBlocks[location.tsBlockIndex],
location.rowIndex);
+ }
+ }
+
@Override
public boolean hasNext() throws Exception {
if (finished) {
@@ -265,7 +291,8 @@ public class LastQueryMergeOperator implements
ProcessOperator {
@Override
public long calculateRetainedSizeAfterCallingNext() {
- long childrenSum = 0, minChildReturnSize = Long.MAX_VALUE;
+ long childrenSum = 0;
+ long minChildReturnSize = Long.MAX_VALUE;
for (Operator child : children) {
long maxReturnSize = child.calculateMaxReturnSize();
childrenSum += (maxReturnSize +
child.calculateRetainedSizeAfterCallingNext());
@@ -279,7 +306,7 @@ public class LastQueryMergeOperator implements
ProcessOperator {
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock,
return true; else
- * return false;
+ * return false.
*/
private boolean empty(int columnIndex) {
return inputTsBlocks[columnIndex] == null
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 830ef97f1e6..9673f9b36ec 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 9c32e9cd3b8..fe002a43c0a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -133,30 +134,22 @@ public class LastQuerySortOperator implements
ProcessOperator {
return res;
}
+ return buildResult();
+ }
+
+ private TsBlock buildResult() throws Exception {
// start stopwatch
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
int endIndex = getEndIndex();
- while ((System.nanoTime() - start < maxRuntime)
- && (currentIndex < endIndex
- || (previousTsBlock != null
- && previousTsBlockIndex < previousTsBlock.getPositionCount()))
- && !tsBlockBuilder.isFull()) {
- if (previousTsBlock == null || previousTsBlock.getPositionCount() <=
previousTsBlockIndex) {
- if (children.get(currentIndex).hasNextWithTimer()) {
- previousTsBlock = children.get(currentIndex).nextWithTimer();
- previousTsBlockIndex = 0;
- if (previousTsBlock == null) {
- return null;
- }
- } else {
- children.get(currentIndex).close();
- children.set(currentIndex, null);
- }
- currentIndex++;
+ while (keepGoing(start, maxRuntime, endIndex)) {
+
+ if (prepareData()) {
+ return null;
}
+
if (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
if (canUseDataFromCachedTsBlock(previousTsBlock,
previousTsBlockIndex)) {
LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock,
cachedTsBlockRowIndex++);
@@ -171,6 +164,31 @@ public class LastQuerySortOperator implements
ProcessOperator {
return res;
}
+ private boolean keepGoing(long start, long maxRuntime, int endIndex) {
+ return (System.nanoTime() - start < maxRuntime)
+ && (currentIndex < endIndex
+ || (previousTsBlock != null
+ && previousTsBlockIndex < previousTsBlock.getPositionCount()))
+ && !tsBlockBuilder.isFull();
+ }
+
+ private boolean prepareData() throws Exception {
+ if (previousTsBlock == null || previousTsBlock.getPositionCount() <=
previousTsBlockIndex) {
+ if (children.get(currentIndex).hasNextWithTimer()) {
+ previousTsBlock = children.get(currentIndex).nextWithTimer();
+ previousTsBlockIndex = 0;
+ if (previousTsBlock == null) {
+ return true;
+ }
+ } else {
+ children.get(currentIndex).close();
+ children.set(currentIndex, null);
+ }
+ currentIndex++;
+ }
+ return false;
+ }
+
@Override
public boolean hasNext() throws Exception {
return currentIndex < inputOperatorsCount
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
index c674fc44f86..2e7e23b5bb7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -45,6 +46,10 @@ public class LastQueryUtil {
private static final boolean CACHE_ENABLED =
CommonDescriptor.getInstance().getConfig().isLastCacheEnable();
+ private LastQueryUtil() {
+ // util class doesn't need constructor
+ }
+
public static TsBlockBuilder createTsBlockBuilder() {
return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT,
TSDataType.TEXT, TSDataType.TEXT));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 44fb538b64b..11d36719d63 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.commons.path.MeasurementPath;
@@ -36,7 +37,7 @@ public class UpdateLastCacheOperator extends
AbstractUpdateLastCacheOperator {
// accept PartialPath
private final MeasurementPath fullPath;
- // dataType for queried time series;
+ // type for queried time series
protected final String dataType;
public UpdateLastCacheOperator(