This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch fixObjectFunctionsAndAddTests
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5d607eba0521a53c4f98e5b6e2e4a7c14be58c20
Author: shuwenwei <[email protected]>
AuthorDate: Thu Nov 27 18:44:38 2025 +0800

    add functions for object type and add tests
---
 .../it/query/recent/IoTDBObjectTypeQueryIT.java    | 302 +++++++++++++++++++++
 .../iotdb/udf/api/relational/access/Record.java    |  35 ++-
 .../confignode1conf/iotdb-system.properties        |   2 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   8 +
 .../operator/process/function/partition/Slice.java |  15 +
 .../expression/PatternExpressionComputation.java   |   5 +-
 .../aggregation/MaskedRecordIterator.java          |   2 -
 .../relational/aggregation}/RecordIterator.java    |  17 +-
 .../UserDefinedAggregateFunctionAccumulator.java   |   1 -
 .../GroupedUserDefinedAggregateAccumulator.java    |   2 +-
 .../relational/ColumnTransformerBuilder.java       |  12 +-
 .../plan/analyze/ClusterPartitionFetcher.java      |   4 +
 .../plan/planner/OperatorTreeGenerator.java        |   1 +
 .../plan/planner/TableOperatorGenerator.java       |   6 +-
 .../plan/node/write/RelationalInsertRowsNode.java  |   3 +
 .../relational/metadata/TableMetadataImpl.java     |   7 +-
 .../udf/UserDefineScalarFunctionTransformer.java   |   2 +-
 .../AbstractCastFunctionColumnTransformer.java     |  12 +
 ...r.java => AbstractLengthColumnTransformer.java} |  15 +-
 .../scalar/CastFunctionColumnTransformer.java      |   3 +
 .../unary/scalar/LengthColumnTransformer.java      |  30 +-
 .../scalar/ObjectLengthColumnTransformer.java}     |  23 +-
 .../unary/scalar/ReadObjectColumnTransformer.java  |  44 +--
 .../scalar/TryCastFunctionColumnTransformer.java   |   3 +
 .../org/apache/iotdb/db/utils/ObjectTypeUtils.java | 124 +++++++++
 .../object/ObjectTypeCompactionTest.java           | 259 ++++++++++++++++++
 .../src/main/thrift/datanode.thrift                |   8 +
 27 files changed, 842 insertions(+), 103 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java
new file mode 100644
index 00000000000..45b32a57b07
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.utils.Binary;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.LocalDate;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBObjectTypeQueryIT {
+
+  private static final String DATABASE_NAME = "test";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE " + DATABASE_NAME);
+      statement.execute("USE " + DATABASE_NAME);
+      statement.execute(
+          "CREATE TABLE table1(device STRING TAG, s4 DATE FIELD, s5 TIMESTAMP 
FIELD, s6 BLOB FIELD, s7 STRING FIELD, s8 OBJECT FIELD, s9 OBJECT FIELD)");
+      for (int i = 1; i <= 10; i++) {
+        for (int j = 0; j < 10; j++) {
+          statement.execute(
+              String.format(
+                  "insert into table1(time, device, s4, s5, s6, s7, s8) "
+                      + "values(%d, '%s', '%s', %d, %s, '%s', %s)",
+                  j,
+                  "d" + i,
+                  LocalDate.of(2024, 5, i % 31 + 1),
+                  j,
+                  "X'cafebabe'",
+                  j,
+                  "to_object(true, 0, X'cafebabe')"));
+          if (i == 10 && j == 9) {
+            statement.execute(
+                String.format(
+                    "insert into table1(time, device, s4, s5, s6, s7, s8) "
+                        + "values(%d, '%s', '%s', %d, %s, '%s', %s)",
+                    j, "d" + i, LocalDate.of(2024, 5, i % 31 + 1), j, 
"X'cafebabe'", j, "null"));
+          }
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testObjectLength() throws IoTDBConnectionException, 
StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE " + DATABASE_NAME);
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement("select length(s8) from table1 limit 
1");
+      SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        long length = iterator.getLong(1);
+        Assert.assertEquals(4, length);
+      }
+    }
+  }
+
+  @Test
+  public void testReadObject() throws IoTDBConnectionException, 
StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE " + DATABASE_NAME);
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement("select read_object(s8) from table1 
where device = 'd2'");
+      SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+      byte[] expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, 
(byte) 0xBE};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select read_object(s8, 1) from table1 where device = 'd3'");
+      iterator = sessionDataSet.iterator();
+      expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select read_object(s8, 1, 2) from table1 where device = 'd1'");
+      iterator = sessionDataSet.iterator();
+      expected = new byte[] {(byte) 0xFE, (byte) 0xBA};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select read_object(s8, 1, 1000) from table1 where device = 
'd1'");
+      iterator = sessionDataSet.iterator();
+      expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select count(*) from table1 where device = 'd1' and s6 = 
read_object(s8)");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        long count = iterator.getLong(1);
+        Assert.assertEquals(10, count);
+      }
+
+      // read_object are not pushed down. Read remote files
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select read_object(t1_s8) from (select t1.s8 as t1_s8, t2.s8 as 
t2_s8 from table1 as t1 inner join table1 as t2 using(time))");
+      iterator = sessionDataSet.iterator();
+      expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 
0xBE};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+    }
+  }
+
+  @Test
+  public void testFunctionAndClauses()
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE " + DATABASE_NAME);
+
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement(
+              "select length(s8) from table1 where device = 'd2' and s8 is not 
null limit 1");
+      SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals(4, iterator.getLong(1));
+      }
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select count(s8), first(s8), last(s8), first_by(s8, time), 
last_by(s8, time) from table1 where device = 'd1' and cast(s8 as string) = 
'(Object) 4 B' and try_cast(s8 as string) = '(Object) 4 B'");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals(10, iterator.getLong(1));
+        Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+        Assert.assertEquals("(Object) 4 B", iterator.getString(3));
+        Assert.assertEquals("(Object) 4 B", iterator.getString(4));
+        Assert.assertEquals("(Object) 4 B", iterator.getString(5));
+      }
+
+      sessionDataSet = session.executeQueryStatement("select coalesce(s9, s8) 
from table1");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals("(Object) 4 B", iterator.getString(1));
+      }
+
+      // MATCH_RECOGNIZE
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select m.cnt from table1 match_recognize (order by s8 
measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6 
= prev(B.s6)) as m"));
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select m.cnt from table1 match_recognize (partition by s8 
measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6 
= prev(B.s6)) as m"));
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select m.value from table1 match_recognize(partition by s6 
measures prev(s8) as value one row per match pattern (B+) define B as 
B.s6=prev(B.s6)) as m");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals("(Object) 4 B", iterator.getString(1));
+      }
+
+      // WHERE
+      session.executeQueryStatement(
+          "select time, s8 from table1 where device = 'd10' and s8 is not 
null");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+      }
+
+      // GROUP BY
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () -> session.executeNonQueryStatement("select count(*) from table1 
group by s8"));
+
+      // ORDER BY
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () -> session.executeNonQueryStatement("select count(*) from table1 
group by s8"));
+
+      // FILL
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select time, s8 from table1 where device = 'd10' fill 
method linear"));
+      session.executeQueryStatement(
+          "select time, s8 from table1 where device = 'd10' fill method 
previous");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+      }
+
+      // HAVING
+      session.executeQueryStatement(
+          "select device, count(s8) from table1 group by device having 
count(s8) > 0");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        long count = iterator.getLong(2);
+        Assert.assertEquals(10, count);
+      }
+
+      // WINDOW
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select *, nth_value(s8,2) over(partition by s8) from 
table1"));
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select *, nth_value(s8,2) over(order by s8) from table1"));
+      session.executeNonQueryStatement(
+          "select *, nth_value(s8,2) over(partition by device) from table1");
+      session.executeNonQueryStatement(
+          "select *, lead(s8) over(partition by device order by time) from 
table1");
+      session.executeNonQueryStatement(
+          "select *, first_value(s8) over(partition by device) from table1");
+      session.executeNonQueryStatement(
+          "select *, last_value(s8) over(partition by device) from table1");
+      session.executeNonQueryStatement(
+          "select *, lag(s8) over(partition by device order by time) from 
table1");
+
+      // Table-value function
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select * from session(data => table1 partition by s8, 
timecol => 'time', gap => 1ms)"));
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select * from session(data => table1 order by s8, timecol 
=> 'time', gap => 1ms)"));
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select * from hop(data => table1, timecol => 'time', slide => 
1ms, size => 2ms)");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        String str = iterator.getString("s8");
+        Assert.assertEquals("(Object) 4 B", str);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
index 8c6e2a7f357..c4baa5fc5c8 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
@@ -83,7 +83,7 @@ public interface Record {
    * Returns the Binary value at the specified column in this row.
    *
    * <p>Users need to ensure that the data type of the specified column is 
{@code TSDataType.TEXT},
-   * {@code TSDataType.STRING} or {@code TSDataType.BLOB}.
+   * {@code TSDataType.STRING} or {@code TSDataType.BLOB} or {@code 
TSDataType.OBJECT}.
    *
    * @param columnIndex index of the specified column
    * @return the Binary value at the specified column in this row
@@ -94,7 +94,7 @@ public interface Record {
    * Returns the String value at the specified column in this row.
    *
    * <p>Users need to ensure that the data type of the specified column is 
{@code TSDataType.TEXT}
-   * or {@code TSDataType.STRING}.
+   * or {@code TSDataType.STRING} or {@code TSDataType.OBJECT}.
    *
    * @param columnIndex index of the specified column
    * @return the String value at the specified column in this row
@@ -113,6 +113,37 @@ public interface Record {
 
   Object getObject(int columnIndex);
 
+  /**
+   * Returns the Binary representation of an object stored at the specified 
column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is 
{@code
+   * TSDataType.OBJECT}.
+   *
+   * <p>This method returns the entire binary data of the object and may 
require considerable memory
+   * if the stored object is large.
+   *
+   * @param columnIndex index of the specified column
+   * @return the Binary content of the object at the specified column
+   */
+  Binary readObject(int columnIndex);
+
+  /**
+   * Returns a partial Binary segment of an object stored at the specified 
column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is 
{@code
+   * TSDataType.OBJECT}.
+   *
+   * <p>This method enables reading a subset of the stored object without 
materializing the entire
+   * binary data in memory, which is useful for large objects and streaming 
access patterns.
+   *
+   * @param columnIndex index of the specified column
+   * @param offset byte offset of the subsection read
+   * @param length number of bytes to read starting from the offset. If length 
< 0, read the entire
+   *     binary data from offset.
+   * @return the Binary content of the object segment at the specified column
+   */
+  Binary readObject(int columnIndex, long offset, long length);
+
   /**
    * Returns the actual data type of the value at the specified column in this 
row.
    *
diff --git 
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
 
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
index b396e373f86..082ee7bf488 100644
--- 
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
+++ 
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
@@ -34,7 +34,7 @@ timestamp_precision=ms
 
data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
 
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 schema_replication_factor=3
-data_replication_factor=3
+data_replication_factor=1
 udf_lib_dir=target/confignode1/ext/udf
 trigger_lib_dir=target/confignode1/ext/trigger
 pipe_lib_dir=target/confignode1/ext/pipe
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index dcc62fcd3c8..8f1b97b7196 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -201,6 +201,7 @@ import 
org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
 import org.apache.iotdb.db.trigger.executor.TriggerFireResult;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.metrics.type.AutoGauge;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -279,6 +280,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TPushSingleTopicMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
+import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
@@ -3051,6 +3053,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  @Override
+  public ByteBuffer readObject(TReadObjectReq req) {
+    return ObjectTypeUtils.readObjectContent(
+        req.getRelativePath(), req.getOffset(), req.getSize(), false);
+  }
+
   public void handleClientExit() {
     // Do nothing
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
index b7ff6f3e0fd..5e09ca5ff26 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
@@ -19,6 +19,7 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition;
 
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 import org.apache.iotdb.udf.api.relational.access.Record;
 import org.apache.iotdb.udf.api.type.Type;
 
@@ -186,6 +187,20 @@ public class Slice {
       return originalColumns[columnIndex].getObject(offset);
     }
 
+    @Override
+    public Binary readObject(int columnIndex, long offset, long length) {
+      if (getDataType(columnIndex) == Type.OBJECT) {
+        throw new UnsupportedOperationException("current column is not object 
column");
+      }
+      Binary binary = getBinary(columnIndex);
+      return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, 
length, true).array());
+    }
+
+    @Override
+    public Binary readObject(int columnIndex) {
+      return readObject(columnIndex, 0L, -1);
+    }
+
     @Override
     public Type getDataType(int columnIndex) {
       return dataTypes.get(columnIndex);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
index 6b5032627b2..cb8cbdccbb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
@@ -35,6 +35,7 @@ import org.apache.tsfile.read.common.type.BlobType;
 import org.apache.tsfile.read.common.type.BooleanType;
 import org.apache.tsfile.read.common.type.DoubleType;
 import org.apache.tsfile.read.common.type.FloatType;
+import org.apache.tsfile.read.common.type.ObjectType;
 import org.apache.tsfile.read.common.type.Type;
 
 import java.util.ArrayList;
@@ -161,7 +162,9 @@ public class PatternExpressionComputation {
       return partition.getFloat(channel, position);
     } else if (type instanceof DoubleType) {
       return partition.getDouble(channel, position);
-    } else if (type instanceof AbstractVarcharType || type instanceof 
BlobType) {
+    } else if (type instanceof AbstractVarcharType
+        || type instanceof BlobType
+        || type instanceof ObjectType) {
       return partition.getBinary(channel, position);
     } else {
       throw new SemanticException("Unsupported type: " + 
type.getClass().getSimpleName());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
index 5237fcfd12e..48095927eda 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
@@ -19,8 +19,6 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
-
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.read.common.type.Type;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
similarity index 88%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
index 6f5813955dd..f2f306f2bc2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.udf.access;
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 import org.apache.iotdb.udf.api.relational.access.Record;
 import org.apache.iotdb.udf.api.type.Type;
 
@@ -138,6 +139,20 @@ public class RecordIterator implements Iterator<Record> {
       return childrenColumns.get(columnIndex).getObject(index);
     }
 
+    @Override
+    public Binary readObject(int columnIndex, long offset, long length) {
+      if (getDataType(columnIndex) == Type.OBJECT) {
+        throw new UnsupportedOperationException("current column is not object 
column");
+      }
+      Binary binary = getBinary(columnIndex);
+      return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, 
length, true).array());
+    }
+
+    @Override
+    public Binary readObject(int columnIndex) {
+      return readObject(columnIndex, 0L, -1);
+    }
+
     @Override
     public Type getDataType(int columnIndex) {
       return 
UDFDataTypeTransformer.transformReadTypeToUDFDataType(dataTypes.get(columnIndex));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
index 64c0896b3a1..70e1c0c5d44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
@@ -19,7 +19,6 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
 import org.apache.iotdb.udf.api.State;
 import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis;
 import org.apache.iotdb.udf.api.relational.AggregateFunction;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
index b3d24e923ae..9ac6b48db7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
@@ -19,9 +19,9 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.MaskedRecordIterator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.ObjectBigArray;
 import org.apache.iotdb.udf.api.State;
 import org.apache.iotdb.udf.api.relational.AggregateFunction;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
index 0378c80f042..19f81b78f68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
@@ -163,6 +163,7 @@ import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Ln
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Log10ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LongToBytesColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LowerColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.ObjectLengthColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrim2ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrimColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RadiansColumnTransformer;
@@ -243,7 +244,6 @@ import static 
org.apache.tsfile.read.common.type.DoubleType.DOUBLE;
 import static org.apache.tsfile.read.common.type.FloatType.FLOAT;
 import static org.apache.tsfile.read.common.type.IntType.INT32;
 import static org.apache.tsfile.read.common.type.LongType.INT64;
-import static org.apache.tsfile.read.common.type.ObjectType.OBJECT;
 import static org.apache.tsfile.read.common.type.StringType.STRING;
 
 public class ColumnTransformerBuilder
@@ -780,7 +780,9 @@ public class ColumnTransformerBuilder
     } else if 
(TableBuiltinScalarFunction.LENGTH.getFunctionName().equalsIgnoreCase(functionName))
 {
       ColumnTransformer first = this.process(children.get(0), context);
       if (children.size() == 1) {
-        return new LengthColumnTransformer(INT32, first);
+        return context.inputDataTypes.get(0) == TSDataType.OBJECT
+            ? new ObjectLengthColumnTransformer(INT64, first)
+            : new LengthColumnTransformer(INT64, first);
       }
     } else if 
(TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName))
 {
       ColumnTransformer first = this.process(children.get(0), context);
@@ -1456,10 +1458,10 @@ public class ColumnTransformerBuilder
         .equalsIgnoreCase(functionName)) {
       ColumnTransformer first = this.process(children.get(0), context);
       if (children.size() == 1) {
-        return new ReadObjectColumnTransformer(OBJECT, first, 
context.fragmentInstanceContext);
+        return new ReadObjectColumnTransformer(BLOB, first, 
context.fragmentInstanceContext);
       } else if (children.size() == 2) {
         return new ReadObjectColumnTransformer(
-            OBJECT,
+            BLOB,
             ((LongLiteral) children.get(1)).getParsedValue(),
             first,
             context.fragmentInstanceContext);
@@ -1468,7 +1470,7 @@ public class ColumnTransformerBuilder
         long length = ((LongLiteral) children.get(2)).getParsedValue();
         checkArgument(offset >= 0 && length >= 0);
         return new ReadObjectColumnTransformer(
-            OBJECT,
+            BLOB,
             ((LongLiteral) children.get(1)).getParsedValue(),
             ((LongLiteral) children.get(2)).getParsedValue(),
             first,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index 8c54fd640f8..2274762341b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -308,6 +308,10 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), 
req.getRegionRouteMap());
   }
 
+  public List<TRegionReplicaSet> getRegionReplicaSet(List<TConsensusGroupId> 
consensusGroupIds) {
+    return partitionCache.getRegionReplicaSet(consensusGroupIds);
+  }
+
   @Override
   public void invalidAllCache() {
     partitionCache.invalidAllCache();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 58164159183..44be3e69afb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -1435,6 +1435,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         case TEXT:
         case STRING:
         case BLOB:
+        case OBJECT:
           previousFill[i] =
               filter == null
                   ? new BinaryPreviousFill()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index d7d4a951cb8..c29ceaff04f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -268,6 +268,7 @@ import 
org.apache.tsfile.read.common.block.column.LongColumn;
 import org.apache.tsfile.read.common.type.BinaryType;
 import org.apache.tsfile.read.common.type.BlobType;
 import org.apache.tsfile.read.common.type.BooleanType;
+import org.apache.tsfile.read.common.type.ObjectType;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.read.common.type.TypeFactory;
 import org.apache.tsfile.read.filter.basic.Filter;
@@ -3825,6 +3826,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         case MAX:
         case MIN:
           if (BlobType.BLOB.equals(argumentType)
+              || ObjectType.OBJECT.equals(argumentType)
               || BinaryType.TEXT.equals(argumentType)
               || BooleanType.BOOLEAN.equals(argumentType)) {
             canUseStatistic = false;
@@ -3840,8 +3842,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             descendingCount++;
           }
 
-          // first/last/first_by/last_by aggregation with BLOB type can not 
use statistics
-          if (BlobType.BLOB.equals(argumentType)) {
+          // first/last/first_by/last_by aggregation with BLOB or OBJECT type 
can not use statistics
+          if (BlobType.BLOB.equals(argumentType) || 
ObjectType.OBJECT.equals(argumentType)) {
             canUseStatistic = false;
             break;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 83f6bbec63e..2297ddcebdd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -207,6 +207,9 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
     for (int j = 0; j < insertRowNode.getDataTypes().length; j++) {
       if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) {
         Object[] values = insertRowNode.getValues();
+        if (values[j] == null) {
+          continue;
+        }
         byte[] binary = ((Binary) values[j]).getValues();
         ByteBuffer buffer = ByteBuffer.wrap(binary);
         boolean isEoF = buffer.get() == 1;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index cc9af5c9b72..b6242ec000f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -265,13 +265,14 @@ public class TableMetadataImpl implements Metadata {
       }
       return STRING;
     } else if 
(TableBuiltinScalarFunction.LENGTH.getFunctionName().equalsIgnoreCase(functionName))
 {
-      if (!(argumentTypes.size() == 1 && isCharType(argumentTypes.get(0)))) {
+      if (!(argumentTypes.size() == 1 && (isCharType(argumentTypes.get(0)))
+          || isObjectType(argumentTypes.get(0)))) {
         throw new SemanticException(
             "Scalar function "
                 + functionName.toLowerCase(Locale.ENGLISH)
-                + " only accepts one argument and it must be text or string 
data type.");
+                + " only accepts one argument and it must be text or string or 
object data type.");
       }
-      return INT32;
+      return INT64;
     } else if 
(TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName))
 {
       if (!(argumentTypes.size() == 1 && isCharType(argumentTypes.get(0)))) {
         throw new SemanticException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
index 279e3c06fd4..47fd40ed73f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.transformation.dag.column.udf;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.multi.MultiColumnTransformer;
 import org.apache.iotdb.udf.api.relational.ScalarFunction;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
index ee40a3ec6f8..4cd2358118f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
@@ -341,6 +341,7 @@ public abstract class AbstractCastFunctionColumnTransformer 
extends UnaryColumnT
         case TEXT:
         case STRING:
         case BLOB:
+        case OBJECT:
           returnType.writeBinary(columnBuilder, value);
           break;
         default:
@@ -393,4 +394,15 @@ public abstract class 
AbstractCastFunctionColumnTransformer extends UnaryColumnT
           String.format("Cannot cast %s to %s type", stringValue, 
returnType.getDisplayName()));
     }
   }
+
+  protected void castObject(ColumnBuilder columnBuilder, Binary value) {
+    String stringValue = 
BytesUtils.parseObjectByteArrayToString(value.getValues());
+    switch (returnType.getTypeEnum()) {
+      case STRING:
+        returnType.writeBinary(columnBuilder, 
BytesUtils.valueOf(String.valueOf(stringValue)));
+        break;
+      default:
+        throw new UnsupportedOperationException(String.format(ERROR_MSG, 
returnType.getTypeEnum()));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
similarity index 77%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
index 00448c6f575..08eb4769166 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
@@ -24,12 +24,13 @@ import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColu
 
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
 
-public class LengthColumnTransformer extends UnaryColumnTransformer {
+public abstract class AbstractLengthColumnTransformer extends 
UnaryColumnTransformer {
 
-  public LengthColumnTransformer(Type returnType, ColumnTransformer 
childColumnTransformer) {
+  public AbstractLengthColumnTransformer(
+      Type returnType, ColumnTransformer childColumnTransformer) {
     super(returnType, childColumnTransformer);
   }
 
@@ -37,8 +38,7 @@ public class LengthColumnTransformer extends 
UnaryColumnTransformer {
   protected void doTransform(Column column, ColumnBuilder columnBuilder) {
     for (int i = 0, n = column.getPositionCount(); i < n; i++) {
       if (!column.isNull(i)) {
-        String currentValue = 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
-        columnBuilder.writeInt(currentValue.length());
+        columnBuilder.writeLong(transformNonNullValue(column.getBinary(i)));
       } else {
         columnBuilder.appendNull();
       }
@@ -49,11 +49,12 @@ public class LengthColumnTransformer extends 
UnaryColumnTransformer {
   protected void doTransform(Column column, ColumnBuilder columnBuilder, 
boolean[] selection) {
     for (int i = 0, n = column.getPositionCount(); i < n; i++) {
       if (selection[i] && !column.isNull(i)) {
-        String currentValue = 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
-        columnBuilder.writeInt(currentValue.length());
+        columnBuilder.writeLong(transformNonNullValue(column.getBinary(i)));
       } else {
         columnBuilder.appendNull();
       }
     }
   }
+
+  protected abstract long transformNonNullValue(Binary binary);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
index b9c8e31b1ab..624eabadc62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
@@ -67,6 +67,9 @@ public class CastFunctionColumnTransformer extends 
AbstractCastFunctionColumnTra
       case BLOB:
         castBlob(columnBuilder, childType.getBinary(column, i));
         break;
+      case OBJECT:
+        castObject(columnBuilder, childType.getBinary(column, i));
+        break;
       default:
         throw new UnsupportedOperationException(
             String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
index 00448c6f575..c94530c83d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
@@ -20,40 +20,18 @@
 package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
-import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
 
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
 
-public class LengthColumnTransformer extends UnaryColumnTransformer {
-
+public class LengthColumnTransformer extends AbstractLengthColumnTransformer {
   public LengthColumnTransformer(Type returnType, ColumnTransformer 
childColumnTransformer) {
     super(returnType, childColumnTransformer);
   }
 
   @Override
-  protected void doTransform(Column column, ColumnBuilder columnBuilder) {
-    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
-      if (!column.isNull(i)) {
-        String currentValue = 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
-        columnBuilder.writeInt(currentValue.length());
-      } else {
-        columnBuilder.appendNull();
-      }
-    }
-  }
-
-  @Override
-  protected void doTransform(Column column, ColumnBuilder columnBuilder, 
boolean[] selection) {
-    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
-      if (selection[i] && !column.isNull(i)) {
-        String currentValue = 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
-        columnBuilder.writeInt(currentValue.length());
-      } else {
-        columnBuilder.appendNull();
-      }
-    }
+  protected long transformNonNullValue(Binary binary) {
+    return binary.getStringValue(TSFileConfig.STRING_CHARSET).length();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
similarity index 57%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
index 5237fcfd12e..5d39c6f6af3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
@@ -17,26 +17,21 @@
  * under the License.
  */
 
-package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 
-import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
 
-import java.util.List;
-
-public class MaskedRecordIterator extends RecordIterator {
-  private final int[] selectedPositions;
-
-  public MaskedRecordIterator(
-      List<Column> childrenColumns, List<Type> dataTypes, AggregationMask 
mask) {
-    super(childrenColumns, dataTypes, mask.getSelectedPositionCount());
-    this.selectedPositions = mask.getSelectedPositions();
+public class ObjectLengthColumnTransformer extends 
AbstractLengthColumnTransformer {
+  public ObjectLengthColumnTransformer(Type returnType, ColumnTransformer 
childColumnTransformer) {
+    super(returnType, childColumnTransformer);
   }
 
   @Override
-  protected int getCurrentIndex() {
-    return selectedPositions[currentIndex++];
+  protected long transformNonNullValue(Binary binary) {
+    return ObjectTypeUtils.getObjectLength(binary);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 9504c6c2282..8c791191b42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -19,25 +19,18 @@
 
 package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
-import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
 import org.apache.iotdb.db.utils.ObjectTypeUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
 import java.util.Optional;
 
 public class ReadObjectColumnTransformer extends UnaryColumnTransformer {
@@ -107,35 +100,14 @@ public class ReadObjectColumnTransformer extends 
UnaryColumnTransformer {
   }
 
   private Binary readObject(Binary binary) {
-    File file = ObjectTypeUtils.getObjectPathFromBinary(binary);
-    long actualReadSize = getActualReadSize(file);
+    Pair<Long, String> ObjectLengthPathPair = 
ObjectTypeUtils.parseObjectBinary(binary);
+    long fileLength = ObjectLengthPathPair.getLeft();
+    String relativePath = ObjectLengthPathPair.getRight();
+    int actualReadSize =
+        ObjectTypeUtils.getActualReadSize(relativePath, fileLength, offset, 
length);
     fragmentInstanceContext.ifPresent(
         context -> 
context.getMemoryReservationContext().reserveMemoryCumulatively(actualReadSize));
-    byte[] bytes = new byte[(int) actualReadSize];
-    ByteBuffer buffer = ByteBuffer.wrap(bytes);
-    try (FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.READ)) {
-      fileChannel.read(buffer, offset);
-    } catch (IOException e) {
-      throw new IoTDBRuntimeException(e, 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-    }
-    return new Binary(bytes);
-  }
-
-  private long getActualReadSize(File file) {
-    long fileSize = file.length();
-    if (offset >= fileSize) {
-      throw new SemanticException(
-          String.format(
-              "offset %d is greater than object size %d, file path is %s",
-              offset, fileSize, file.getAbsolutePath()));
-    }
-    long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize - 
offset);
-    if (actualReadSize > Integer.MAX_VALUE) {
-      throw new SemanticException(
-          String.format(
-              "Read object size %s is too large (size > 2G), file path is %s",
-              actualReadSize, file.getAbsolutePath()));
-    }
-    return actualReadSize;
+    return new Binary(
+        ObjectTypeUtils.readObjectContent(relativePath, offset, 
actualReadSize, true).array());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
index c25fd321c34..419d1bbabf3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
@@ -69,6 +69,9 @@ public class TryCastFunctionColumnTransformer extends 
AbstractCastFunctionColumn
         case BLOB:
           castBlob(columnBuilder, childType.getBinary(column, i));
           break;
+        case OBJECT:
+          castObject(columnBuilder, childType.getBinary(column, i));
+          break;
         default:
           throw new UnsupportedOperationException(
               String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
index c153061a90d..af694585883 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
@@ -19,18 +19,38 @@
 
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
+import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.TException;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 
 public class ObjectTypeUtils {
@@ -40,6 +60,110 @@ public class ObjectTypeUtils {
 
   private ObjectTypeUtils() {}
 
+  public static ByteBuffer readObjectContent(
+      Binary binary, long offset, long length, boolean mayNotInCurrentNode) {
+    Pair<Long, String> ObjectLengthPathPair = 
ObjectTypeUtils.parseObjectBinary(binary);
+    long fileLength = ObjectLengthPathPair.getLeft();
+    length = length < 0 ? fileLength : length;
+    String relativePath = ObjectLengthPathPair.getRight();
+    int actualReadSize =
+        ObjectTypeUtils.getActualReadSize(relativePath, fileLength, offset, 
length);
+    return ObjectTypeUtils.readObjectContent(
+        relativePath, offset, actualReadSize, mayNotInCurrentNode);
+  }
+
+  public static ByteBuffer readObjectContent(
+      String relativePath, long offset, long readSize, boolean 
mayNotInCurrentNode) {
+    Optional<File> objectFile = 
TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false);
+    if (objectFile.isPresent()) {
+      return readObjectContentFromLocalFile(objectFile.get(), offset, 
readSize);
+    }
+    if (mayNotInCurrentNode) {
+      return readObjectContentFromRemoteFile(relativePath, offset, readSize);
+    }
+    throw new ObjectFileNotExist(relativePath);
+  }
+
+  private static ByteBuffer readObjectContentFromLocalFile(File file, long 
offset, long readSize) {
+    byte[] bytes = new byte[(int) readSize];
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    try (FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.READ)) {
+      fileChannel.read(buffer, offset);
+    } catch (IOException e) {
+      throw new IoTDBRuntimeException(e, 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  private static ByteBuffer readObjectContentFromRemoteFile(
+      String relativePath, long offset, long readSize) {
+    byte[] bytes = new byte[(int) readSize];
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    TConsensusGroupId consensusGroupId =
+        new TConsensusGroupId(
+            TConsensusGroupType.DataRegion,
+            Integer.parseInt(Paths.get(relativePath).getName(0).toString()));
+    List<TRegionReplicaSet> regionReplicaSetList =
+        ClusterPartitionFetcher.getInstance()
+            .getRegionReplicaSet(Collections.singletonList(consensusGroupId));
+    TRegionReplicaSet regionReplicaSet = 
regionReplicaSetList.iterator().next();
+    final int batchSize = 1024 * 1024 * 1024;
+    final TReadObjectReq req = new TReadObjectReq();
+    req.setRelativePath(relativePath);
+    for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+      try (SyncDataNodeInternalServiceClient client =
+          Coordinator.getInstance()
+              .getInternalServiceClientManager()
+              .borrowClient(dataNodeLocation.getInternalEndPoint())) {
+        while (readSize > 0) {
+          req.setOffset(offset + buffer.position());
+          req.setSize(Math.min(readSize, batchSize));
+          readSize -= req.getSize();
+          ByteBuffer partial = client.readObject(req);
+          buffer.put(partial);
+        }
+      } catch (ClientManagerException | TException e) {
+        logger.error(e.getMessage(), e);
+        throw new IoTDBRuntimeException(e, 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  public static int getActualReadSize(String filePath, long fileSize, long 
offset, long length) {
+    if (offset >= fileSize) {
+      throw new SemanticException(
+          String.format(
+              "offset %d is greater than object size %d, file path is %s",
+              offset, fileSize, filePath));
+    }
+    long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize - 
offset);
+    if (actualReadSize > Integer.MAX_VALUE) {
+      throw new SemanticException(
+          String.format(
+              "Read object size %s is too large (size > 2G), file path is %s",
+              actualReadSize, filePath));
+    }
+    return (int) actualReadSize;
+  }
+
+  public static Pair<Long, String> parseObjectBinary(Binary binary) {
+    byte[] bytes = binary.getValues();
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    long length = buffer.getLong();
+    String relativeObjectFilePath =
+        new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+    return new Pair<>(length, relativeObjectFilePath);
+  }
+
+  public static long getObjectLength(Binary binary) {
+    byte[] bytes = binary.getValues();
+    ByteBuffer wrap = ByteBuffer.wrap(bytes);
+    return wrap.getLong();
+  }
+
   public static File getObjectPathFromBinary(Binary binary) {
     byte[] bytes = binary.getValues();
     String relativeObjectFilePath =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
new file mode 100644
index 00000000000..4487e8b704d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.object;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ColumnSchema;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class ObjectTypeCompactionTest extends AbstractCompactionTest {
+
+  private static final TableSchema tableSchema =
+      new TableSchema(
+          "t1",
+          Arrays.asList(
+              new ColumnSchema("device", TSDataType.STRING, 
ColumnCategory.TAG),
+              new ColumnSchema("s1", TSDataType.OBJECT, 
ColumnCategory.FIELD)));
+
+  private String threadName;
+  private File objectDir;
+
+  @Before
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
+    this.threadName = Thread.currentThread().getName();
+    Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
+    DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+    createTable("t1", 1);
+    super.setUp();
+    try {
+      objectDir = new 
File(TierManager.getInstance().getNextFolderForObjectFile());
+    } catch (DiskSpaceInsufficientException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    Thread.currentThread().setName(threadName);
+    DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+    File[] files = objectDir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        Files.delete(file.toPath());
+      }
+    }
+  }
+
+  public void createTable(String tableName, long ttl) {
+    TsTable tsTable = new TsTable(tableName);
+    tsTable.addColumnSchema(new TagColumnSchema("device", TSDataType.STRING));
+    tsTable.addColumnSchema(
+        new FieldColumnSchema("s1", TSDataType.OBJECT, TSEncoding.PLAIN, 
CompressionType.LZ4));
+    tsTable.addProp(TsTable.TTL_PROPERTY, ttl + "");
+    DataNodeTableCache.getInstance().preUpdateTable(this.COMPACTION_TEST_SG, 
tsTable, null);
+    
DataNodeTableCache.getInstance().commitUpdateTable(this.COMPACTION_TEST_SG, 
tableName, null);
+  }
+
+  @Test
+  public void testSeqCompactionWithTTL() throws IOException, 
WriteProcessException {
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+    tsFileManager.add(pair1.getLeft(), true);
+    tsFileManager.add(pair2.getLeft(), true);
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(true),
+            true,
+            new ReadChunkCompactionPerformer(),
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertFalse(pair1.getRight().exists());
+    Assert.assertTrue(pair2.getRight().exists());
+  }
+
+  @Test
+  public void testUnseqCompactionWithTTL() throws IOException, 
WriteProcessException {
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+    tsFileManager.add(pair1.getLeft(), false);
+    tsFileManager.add(pair2.getLeft(), false);
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(false),
+            false,
+            new FastCompactionPerformer(false),
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertFalse(pair2.getRight().exists());
+    Assert.assertTrue(pair1.getRight().exists());
+  }
+
+  @Test
+  public void testUnseqCompactionWithReadPointWithTTL() throws IOException, 
WriteProcessException {
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+    tsFileManager.add(pair1.getLeft(), false);
+    tsFileManager.add(pair2.getLeft(), false);
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(false),
+            false,
+            new ReadPointCompactionPerformer(),
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertTrue(pair1.getRight().exists());
+    Assert.assertFalse(pair2.getRight().exists());
+  }
+
+  @Test
+  public void testCrossCompactionWithTTL() throws IOException, 
WriteProcessException {
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(true, System.currentTimeMillis() + 100000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+    tsFileManager.add(pair1.getLeft(), true);
+    tsFileManager.add(pair2.getLeft(), false);
+    CrossSpaceCompactionTask task =
+        new CrossSpaceCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(true),
+            tsFileManager.getTsFileList(false),
+            new FastCompactionPerformer(true),
+            1,
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertFalse(pair2.getRight().exists());
+    Assert.assertTrue(pair1.getRight().exists());
+  }
+
+  @Test
+  public void testSettleCompaction() throws IOException, WriteProcessException 
{
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+    tsFileManager.add(pair1.getLeft(), true);
+    tsFileManager.add(pair2.getLeft(), true);
+    SettleCompactionTask task =
+        new SettleCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(true),
+            Collections.emptyList(),
+            true,
+            new FastCompactionPerformer(true),
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertFalse(pair1.getRight().exists());
+    Assert.assertTrue(pair2.getRight().exists());
+  }
+
+  private Pair<TsFileResource, File> generateTsFileAndObject(boolean seq, long 
timestamp)
+      throws IOException, WriteProcessException {
+    TsFileResource resource = createEmptyFileAndResource(seq);
+    Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+    byte[] content = new byte[100];
+    for (int i = 0; i < 100; i++) {
+      content[i] = (byte) i;
+    }
+    Files.write(testFile1, content);
+    String relativePath = testFile1.toFile().getName();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + 
relativePath.length());
+    buffer.putLong(100L);
+    buffer.put(BytesUtils.stringToBytes(relativePath));
+    buffer.flip();
+    IDeviceID deviceID = new StringArrayDeviceID("t1", "d1");
+    try (TsFileIOWriter writer = new TsFileIOWriter(resource.getTsFile())) {
+      writer.getSchema().registerTableSchema(tableSchema);
+      writer.startChunkGroup(deviceID);
+      AlignedChunkWriterImpl alignedChunkWriter =
+          new AlignedChunkWriterImpl(Arrays.asList(new MeasurementSchema("s1", 
TSDataType.OBJECT)));
+      alignedChunkWriter.write(timestamp);
+      alignedChunkWriter.write(timestamp, new Binary(buffer.array()), false);
+      alignedChunkWriter.sealCurrentPage();
+      alignedChunkWriter.writeToFileWriter(writer);
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    resource.updateStartTime(deviceID, 1);
+    resource.updateEndTime(deviceID, 1);
+    resource.serialize();
+    resource.deserialize();
+    resource.setStatus(TsFileResourceStatus.NORMAL);
+    return new Pair<>(resource, testFile1.toFile());
+  }
+}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index caaf44c16a7..64de32597c1 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -773,6 +773,12 @@ struct TKillQueryInstanceReq {
   2: optional string allowedUsername
 }
 
+struct TReadObjectReq {
+  1: string relativePath
+  2: optional i64 offset
+  3: optional i64 size
+}
+
 /**
 * END: Used for EXPLAIN ANALYZE
 **/
@@ -1257,6 +1263,8 @@ service IDataNodeRPCService {
    * Write an audit log entry to the DataNode's AuditEventLogger
    */
   common.TSStatus writeAuditLog(TAuditLogReq req);
+
+  binary readObject(TReadObjectReq req);
 }
 
 service MPPDataExchangeService {

Reply via email to