This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e3de1b7964a Add max_by, min_by aggregation support
e3de1b7964a is described below
commit e3de1b7964af8f7dd23fe3060a6f5917d54db781
Author: Beyyes <[email protected]>
AuthorDate: Sat Oct 19 09:07:21 2024 +0800
Add max_by, min_by aggregation support
---
.../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 12 +
.../relational/aggregation/AccumulatorFactory.java | 4 +
.../relational/aggregation/LastAccumulator.java | 3 +-
.../aggregation/TableMaxByAccumulator.java | 73 ++++
.../aggregation/TableMaxMinByBaseAccumulator.java | 486 +++++++++++++++++++++
.../aggregation/TableMinByAccumulator.java | 73 ++++
6 files changed, 650 insertions(+), 1 deletion(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index d7de73b7283..01d62a5c348 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -1050,6 +1050,18 @@ public class IoTDBMultiIDsWithAttributesTableIT {
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
}
+ @Test
+ public void maxByMinByTest() {
+ String[] expectedHeader1 = buildHeaders(10);
+ sql =
+ "select
max_by(time,floatnum),min_by(time,floatnum),max_by(time,date),min_by(time,date),max_by(time,floatnum),min_by(time,floatnum),max_by(time,ts),min_by(time,ts),max_by(time,stringv),min_by(time,stringv)
from table0";
+ retArray =
+ new String[] {
+
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-01-01T00:00:10.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:00:01.000Z,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
+ }
+
// ==================================================================
// ============================ Join Test ===========================
// ==================================================================
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index fc672796b04..55a2c1e28a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -146,6 +146,10 @@ public class AccumulatorFactory {
return new LastByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1), false, false);
case FIRST_BY:
return new FirstByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1), false, false);
+ case MAX_BY:
+ return new TableMaxByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1));
+ case MIN_BY:
+ return new TableMinByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1));
default:
throw new IllegalArgumentException("Invalid Aggregation function: " +
aggregationType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
index 0a359de585a..df94faae8fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
@@ -151,7 +151,8 @@ public class LastAccumulator implements TableAccumulator {
public void evaluateIntermediate(ColumnBuilder columnBuilder) {
checkArgument(
columnBuilder instanceof BinaryColumnBuilder,
- "intermediate input and output of Avg should be BinaryColumn");
+ "intermediate input and output of Last should be BinaryColumn");
+
if (!initResult) {
columnBuilder.appendNull();
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
new file mode 100644
index 00000000000..ef0ebbb2b79
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+public class TableMaxByAccumulator extends TableMaxMinByBaseAccumulator {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TableMaxByAccumulator.class);
+
+ protected TableMaxByAccumulator(TSDataType xDataType, TSDataType yDataType) {
+ super(xDataType, yDataType);
+ }
+
+ @Override
+ protected boolean check(int yValue, int yExtremeValue) {
+ return yValue > yExtremeValue;
+ }
+
+ @Override
+ protected boolean check(long yValue, long yExtremeValue) {
+ return yValue > yExtremeValue;
+ }
+
+ @Override
+ protected boolean check(float yValue, float yExtremeValue) {
+ return yValue > yExtremeValue;
+ }
+
+ @Override
+ protected boolean check(double yValue, double yExtremeValue) {
+ return yValue > yExtremeValue;
+ }
+
+ @Override
+ protected boolean check(Binary yValue, Binary yExtremeValue) {
+ return yValue.compareTo(yExtremeValue) > 0;
+ }
+
+ @Override
+ protected boolean check(boolean yValue, boolean yExtremeValue) {
+ return yValue;
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE;
+ }
+
+ @Override
+ public TableAccumulator copy() {
+ return new TableMaxByAccumulator(this.xDataType, this.yDataType);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
new file mode 100644
index 00000000000..2b883e3c0ee
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/** max(x,y) returns the value of x associated with the maximum value of y
over all input values. */
+public abstract class TableMaxMinByBaseAccumulator implements TableAccumulator
{
+
+ protected final TSDataType xDataType;
+
+ protected final TSDataType yDataType;
+
+ private final TsPrimitiveType yExtremeValue;
+
+ private final TsPrimitiveType xResult;
+
+ private boolean xNull = true;
+
+ private boolean initResult;
+
+ // private long yTimeStamp = Long.MAX_VALUE;
+
+ private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data
type in MaxBy/MinBy: %s";
+
+ protected TableMaxMinByBaseAccumulator(TSDataType xDataType, TSDataType
yDataType) {
+ this.xDataType = xDataType;
+ this.yDataType = yDataType;
+ this.xResult = TsPrimitiveType.getByType(xDataType);
+ this.yExtremeValue = TsPrimitiveType.getByType(yDataType);
+ }
+
+ // Column should be like: | x | y |
+ @Override
+ public void addInput(Column[] arguments) {
+ checkArgument(arguments.length == 2, "Length of input Column[] for
MaxBy/MinBy should be 2");
+ switch (yDataType) {
+ case INT32:
+ case DATE:
+ addIntInput(arguments);
+ return;
+ case INT64:
+ case TIMESTAMP:
+ addLongInput(arguments);
+ return;
+ case FLOAT:
+ addFloatInput(arguments);
+ return;
+ case DOUBLE:
+ addDoubleInput(arguments);
+ return;
+ case STRING:
+ case TEXT:
+ case BLOB:
+ addBinaryInput(arguments);
+ return;
+ case BOOLEAN:
+ addBooleanInput(arguments);
+ return;
+ default:
+ throw new
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE,
yDataType));
+ }
+ }
+
+ @Override
+ public void addIntermediate(Column argument) {
+ checkArgument(
+ argument instanceof BinaryColumn || argument instanceof
RunLengthEncodedColumn,
+ "intermediate input and output of max_by/min_by should be
BinaryColumn");
+
+ for (int i = 0; i < argument.getPositionCount(); i++) {
+ if (argument.isNull(i)) {
+ continue;
+ }
+
+ byte[] bytes = argument.getBinary(i).getValues();
+ updateFromBytesIntermediateInput(bytes);
+ }
+ }
+
+ @Override
+ public void addStatistics(Statistics[] statistics) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ @Override
+ public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+ checkArgument(
+ columnBuilder instanceof BinaryColumnBuilder,
+ "intermediate input and output of Max_By/Min_By should be
BinaryColumn");
+
+ if (!initResult) {
+ columnBuilder.appendNull();
+ return;
+ }
+ columnBuilder.writeBinary(new Binary(serialize()));
+ }
+
+ @Override
+ public void evaluateFinal(ColumnBuilder columnBuilder) {
+ if (!initResult) {
+ columnBuilder.appendNull();
+ return;
+ }
+ writeX(columnBuilder);
+ }
+
+ @Override
+ public void reset() {
+ initResult = false;
+ xNull = true;
+ this.xResult.reset();
+ this.yExtremeValue.reset();
+ // yTimeStamp = Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ private void addIntInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateIntResult(column[1].getInt(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateIntResult(int yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getInt())) {
+ initResult = true;
+ yExtremeValue.setInt(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addLongInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateLongResult(column[1].getLong(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateLongResult(long yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getLong())) {
+ initResult = true;
+ yExtremeValue.setLong(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addFloatInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateFloatResult(column[1].getFloat(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateFloatResult(float yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getFloat())) {
+ initResult = true;
+ yExtremeValue.setFloat(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addDoubleInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateDoubleResult(column[1].getDouble(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateDoubleResult(double yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getDouble())) {
+ initResult = true;
+ yExtremeValue.setDouble(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addBinaryInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateBinaryResult(column[1].getBinary(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateBinaryResult(Binary yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getBinary())) {
+ initResult = true;
+ yExtremeValue.setBinary(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addBooleanInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateBooleanResult(column[1].getBoolean(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateBooleanResult(boolean yValue, Column xColumn, int xIndex)
{
+ if (!initResult || check(yValue, yExtremeValue.getBoolean())) {
+ initResult = true;
+ yExtremeValue.setBoolean(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void writeX(ColumnBuilder columnBuilder) {
+ if (xNull) {
+ columnBuilder.appendNull();
+ return;
+ }
+ switch (xDataType) {
+ case INT32:
+ case DATE:
+ columnBuilder.writeInt(xResult.getInt());
+ break;
+ case INT64:
+ case TIMESTAMP:
+ columnBuilder.writeLong(xResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(xResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(xResult.getDouble());
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ columnBuilder.writeBinary(xResult.getBinary());
+ break;
+ case BOOLEAN:
+ columnBuilder.writeBoolean(xResult.getBoolean());
+ break;
+ default:
+ throw new
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE,
xDataType));
+ }
+ }
+
+ private void updateX(Column xColumn, int xIndex) {
+ if (xColumn.isNull(xIndex)) {
+ xNull = true;
+ } else {
+ xNull = false;
+ switch (xDataType) {
+ case INT32:
+ case DATE:
+ xResult.setInt(xColumn.getInt(xIndex));
+ break;
+ case INT64:
+ case TIMESTAMP:
+ xResult.setLong(xColumn.getLong(xIndex));
+ break;
+ case FLOAT:
+ xResult.setFloat(xColumn.getFloat(xIndex));
+ break;
+ case DOUBLE:
+ xResult.setDouble(xColumn.getDouble(xIndex));
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ xResult.setBinary(xColumn.getBinary(xIndex));
+ break;
+ case BOOLEAN:
+ xResult.setBoolean(xColumn.getBoolean(xIndex));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
+ }
+ }
+ }
+
+ private byte[] serialize() {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ try {
+ // dataOutputStream.writeLong(yTimeStamp);
+ writeIntermediateToStream(yDataType, yExtremeValue, dataOutputStream);
+ dataOutputStream.writeBoolean(xNull);
+ if (!xNull) {
+ writeIntermediateToStream(xDataType, xResult, dataOutputStream);
+ }
+ } catch (IOException e) {
+ throw new UnsupportedOperationException(
+ "Failed to serialize intermediate result for MaxByAccumulator.", e);
+ }
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ private void writeIntermediateToStream(
+ TSDataType dataType, TsPrimitiveType value, DataOutputStream
dataOutputStream)
+ throws IOException {
+ switch (dataType) {
+ case INT32:
+ case DATE:
+ dataOutputStream.writeInt(value.getInt());
+ break;
+ case INT64:
+ case TIMESTAMP:
+ dataOutputStream.writeLong(value.getLong());
+ break;
+ case FLOAT:
+ dataOutputStream.writeFloat(value.getFloat());
+ break;
+ case DOUBLE:
+ dataOutputStream.writeDouble(value.getDouble());
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ String content = value.getBinary().toString();
+ dataOutputStream.writeInt(content.length());
+ dataOutputStream.writeBytes(content);
+ break;
+ case BOOLEAN:
+ dataOutputStream.writeBoolean(value.getBoolean());
+ break;
+ default:
+ throw new
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, dataType));
+ }
+ }
+
+ private void updateFromBytesIntermediateInput(byte[] bytes) {
+ // long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0);
+ int offset = 0;
+ // Use Column to store x value
+ TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(xDataType));
+ ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
+ switch (yDataType) {
+ case INT32:
+ case DATE:
+ int intMaxVal = BytesUtils.bytesToInt(bytes, offset);
+ offset += Integer.BYTES;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateIntResult(intMaxVal, columnBuilder.build(), 0);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES,
offset);
+ offset += Long.BYTES;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateLongResult(longMaxVal, columnBuilder.build(), 0);
+ break;
+ case FLOAT:
+ float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset);
+ offset += Float.BYTES;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateFloatResult(floatMaxVal, columnBuilder.build(), 0);
+ break;
+ case DOUBLE:
+ double doubleMaxVal = BytesUtils.bytesToDouble(bytes, offset);
+ offset += Long.BYTES;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0);
+ break;
+ case STRING:
+ case TEXT:
+ case BLOB:
+ int length = BytesUtils.bytesToInt(bytes, offset);
+ offset += Integer.BYTES;
+ Binary binaryMaxVal = new Binary(BytesUtils.subBytes(bytes, offset,
length));
+ offset += length;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateBinaryResult(binaryMaxVal, columnBuilder.build(), 0);
+ break;
+ case BOOLEAN:
+ boolean booleanMaxVal = BytesUtils.bytesToBool(bytes, offset);
+ offset += 1;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateBooleanResult(booleanMaxVal, columnBuilder.build(), 0);
+ break;
+ default:
+ throw new
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE,
yDataType));
+ }
+ }
+
+ private void readXFromBytesIntermediateInput(
+ byte[] bytes, int offset, ColumnBuilder columnBuilder) {
+ boolean isXNull = BytesUtils.bytesToBool(bytes, offset);
+ offset += 1;
+ if (isXNull) {
+ columnBuilder.appendNull();
+ } else {
+ switch (xDataType) {
+ case INT32:
+ case DATE:
+ columnBuilder.writeInt(BytesUtils.bytesToInt(bytes, offset));
+ break;
+ case INT64:
+ case TIMESTAMP:
+ columnBuilder.writeLong(BytesUtils.bytesToLongFromOffset(bytes, 8,
offset));
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(BytesUtils.bytesToFloat(bytes, offset));
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(BytesUtils.bytesToDouble(bytes, offset));
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ int length = BytesUtils.bytesToInt(bytes, offset);
+ offset += Integer.BYTES;
+ columnBuilder.writeBinary(new Binary(BytesUtils.subBytes(bytes,
offset, length)));
+ break;
+ case BOOLEAN:
+ columnBuilder.writeBoolean(BytesUtils.bytesToBool(bytes, offset));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
+ }
+ }
+ }
+
+ /**
+ * @param yValue Input y.
+ * @param yExtremeValue Current extreme value of y.
+ * @return True if yValue is the new extreme value.
+ */
+ protected abstract boolean check(int yValue, int yExtremeValue);
+
+ protected abstract boolean check(long yValue, long yExtremeValue);
+
+ protected abstract boolean check(float yValue, float yExtremeValue);
+
+ protected abstract boolean check(double yValue, double yExtremeValue);
+
+ protected abstract boolean check(Binary yValue, Binary yExtremeValue);
+
+ protected abstract boolean check(boolean yValue, boolean yExtremeValue);
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
new file mode 100644
index 00000000000..d4fbadae512
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+public class TableMinByAccumulator extends TableMaxMinByBaseAccumulator {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TableMinByAccumulator.class);
+
+ protected TableMinByAccumulator(TSDataType xDataType, TSDataType yDataType) {
+ super(xDataType, yDataType);
+ }
+
+ @Override
+ protected boolean check(int yValue, int yExtremeValue) {
+ return yValue < yExtremeValue;
+ }
+
+ @Override
+ protected boolean check(long yValue, long yExtremeValue) {
+ return yValue < yExtremeValue;
+ }
+
+ @Override
+ protected boolean check(float yValue, float yExtremeValue) {
+ return yValue < yExtremeValue;
+ }
+
+ @Override
+ protected boolean check(double yValue, double yExtremeValue) {
+ return yValue < yExtremeValue;
+ }
+
+ @Override
+ protected boolean check(Binary yValue, Binary yExtremeValue) {
+ return yValue.compareTo(yExtremeValue) < 0;
+ }
+
+ @Override
+ protected boolean check(boolean yValue, boolean yExtremeValue) {
+ return !yValue;
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE;
+ }
+
+ @Override
+ public TableAccumulator copy() {
+ return new TableMinByAccumulator(xDataType, yDataType);
+ }
+}