This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 62ffffc3 deps: bump datafusion (#1445)
62ffffc3 is described below

commit 62ffffc3057b9159d7a67c4909a3c0e3fe670fc6
Author: Ruixiang Tan <[email protected]>
AuthorDate: Fri Feb 23 15:06:45 2024 +0800

    deps: bump datafusion (#1445)
    
    ## Rationale
    Close #1461
    
    ## Detailed Changes
    Bump datafusion to
    https://github.com/CeresDB/arrow-datafusion/commits/e21b03154, which is
    version 33.
    
    Some important breaking changes:
    - https://github.com/apache/arrow-datafusion/pull/7920
    - https://github.com/apache/arrow-datafusion/issues/9109
    
    ## Test Plan
    CI
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 Cargo.lock                                         | 549 ++++++++++++---------
 Cargo.toml                                         |  22 +-
 .../cases/common/dml/issue-1087.result             |  17 +-
 .../cases/common/dml/issue-302.result              |   2 +-
 .../cases/common/dml/issue-341.result              |  12 +-
 integration_tests/cases/common/dml/issue-59.result |   4 +-
 .../cases/common/explain/explain.result            |   2 +-
 .../cases/common/function/aggregate.result         |  43 ++
 .../cases/common/function/aggregate.sql            |  28 ++
 .../cases/common/optimizer/optimizer.result        |   2 +-
 .../cases/env/cluster/ddl/partition_table.result   |   8 +-
 .../cases/env/cluster/ddl/partition_table.sql      |   4 +
 .../cases/env/local/ddl/query-plan.result          |  27 +-
 .../cases/env/local/ddl/query-plan.sql             |   9 +
 .../src/instance/reorder_memtable.rs               |  11 +-
 src/analytic_engine/src/memtable/skiplist/iter.rs  |   5 +
 .../src/row_iter/record_batch_stream.rs            |   1 +
 src/analytic_engine/src/table/mod.rs               |   1 +
 src/common_types/src/datum.rs                      |  16 +-
 src/common_types/src/projected_schema.rs           |   6 +-
 src/common_types/src/record_batch.rs               |  58 ++-
 src/components/parquet_ext/src/meta_data.rs        |   3 +-
 src/components/parquet_ext/src/prune/min_max.rs    |   4 +-
 .../src/dist_sql_query/physical_plan.rs            |  18 +-
 .../src/dist_sql_query/test_util.rs                |   6 +-
 src/df_operator/src/scalar.rs                      |   3 +-
 src/df_operator/src/udaf.rs                        |   3 +-
 src/interpreters/src/insert.rs                     |   2 +-
 src/interpreters/src/tests.rs                      |  18 +-
 src/proxy/src/grpc/prom_query.rs                   |   2 +-
 src/proxy/src/influxdb/types.rs                    |   2 +-
 src/query_engine/src/datafusion_impl/mod.rs        |   4 +-
 .../physical_optimizer/repartition.rs              |   6 +-
 .../physical_plan_extension/prom_align.rs          |  18 +-
 .../src/datafusion_impl/task_context.rs            |   3 +-
 src/query_frontend/src/influxql/planner.rs         |   2 +-
 src/query_frontend/src/logical_optimizer/mod.rs    |   3 +-
 .../src/logical_optimizer/type_conversion.rs       |   9 +-
 src/query_frontend/src/parser.rs                   |  14 +-
 src/query_frontend/src/promql/convert.rs           |  12 +-
 src/query_frontend/src/promql/remote.rs            |   2 +-
 src/query_frontend/src/provider.rs                 |   2 +-
 src/table_engine/src/memory.rs                     |   2 +-
 src/table_engine/src/predicate.rs                  |   6 +-
 src/table_engine/src/provider.rs                   |  83 +++-
 src/table_engine/src/table.rs                      |   1 +
 46 files changed, 685 insertions(+), 370 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 5f230330..f43a9036 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -90,7 +90,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "arc-swap 1.6.0",
  "arena",
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "async-stream",
  "async-trait",
  "atomic_enum",
@@ -120,7 +120,7 @@ dependencies = [
  "parquet_ext",
  "pin-project-lite",
  "prometheus 0.12.0",
- "prost",
+ "prost 0.11.8",
  "rand 0.7.3",
  "remote_engine_client",
  "router",
@@ -245,24 +245,24 @@ dependencies = [
 
 [[package]]
 name = "arrow"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2feeebd77b34b0bc88f224e06d01c27da4733997cc4789a4e056196656cdc59a"
+checksum = "5bc25126d18a012146a888a0298f2c22e1150327bd2765fc76d710a556b2d614"
 dependencies = [
  "ahash 0.8.3",
- "arrow-arith 43.0.0",
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-csv 43.0.0",
- "arrow-data 43.0.0",
- "arrow-ipc 43.0.0",
- "arrow-json 43.0.0",
- "arrow-ord 43.0.0",
- "arrow-row 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
- "arrow-string 43.0.0",
+ "arrow-arith 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-csv 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-ipc 49.0.0",
+ "arrow-json 49.0.0",
+ "arrow-ord 49.0.0",
+ "arrow-row 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
+ "arrow-string 49.0.0",
 ]
 
 [[package]]
@@ -282,14 +282,14 @@ dependencies = [
 
 [[package]]
 name = "arrow-arith"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7173f5dc49c0ecb5135f52565af33afd3fdc9a12d13bd6f9973e8b96305e4b2e"
+checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7"
 dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
  "chrono",
  "half 2.2.1",
  "num",
@@ -313,14 +313,14 @@ dependencies = [
 
 [[package]]
 name = "arrow-array"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "63d7ea725f7d1f8bb2cffc53ef538557e95fc802e217d5be25122d402e22f3d0"
+checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d"
 dependencies = [
  "ahash 0.8.3",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
  "chrono",
  "chrono-tz",
  "half 2.2.1",
@@ -340,10 +340,11 @@ dependencies = [
 
 [[package]]
 name = "arrow-buffer"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "bdbe439e077f484e5000b9e1d47b5e4c0d15f2b311a8f5bcc682553d5d67a722"
+checksum = "01a0fc21915b00fc6c2667b069c1b64bdd920982f426079bc4a7cab86822886c"
 dependencies = [
+ "bytes",
  "half 2.2.1",
  "num",
 ]
@@ -366,15 +367,16 @@ dependencies = [
 
 [[package]]
 name = "arrow-cast"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "93913cc14875770aa1eef5e310765e855effa352c094cb1c7c00607d0f37b4e1"
+checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a"
 dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
+ "base64 0.21.0",
  "chrono",
  "comfy-table 7.0.1",
  "half 2.2.1",
@@ -403,15 +405,15 @@ dependencies = [
 
 [[package]]
 name = "arrow-csv"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ef55b67c55ed877e6fe7b923121c19dae5e31ca70249ea2779a17b58fb0fbd9a"
+checksum = "2e09aa6246a1d6459b3f14baeaa49606cfdbca34435c46320e14054d244987ca"
 dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
  "chrono",
  "csv",
  "csv-core",
@@ -434,12 +436,12 @@ dependencies = [
 
 [[package]]
 name = "arrow-data"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d4f4f4a3c54614126a71ab91f6631c9743eb4643d6e9318b74191da9dc6e028b"
+checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634"
 dependencies = [
- "arrow-buffer 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-schema 49.0.0",
  "half 2.2.1",
  "num",
 ]
@@ -460,15 +462,15 @@ dependencies = [
 
 [[package]]
 name = "arrow-ipc"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d41a3659f984a524ef1c2981d43747b24d8eec78e2425267fcd0ef34ce71cd18"
+checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd"
 dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
  "flatbuffers",
 ]
 
@@ -494,15 +496,15 @@ dependencies = [
 
 [[package]]
 name = "arrow-json"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "10b95faa95a378f56ef32d84cc0104ea998c39ef7cd1faaa6b4cebf8ea92846d"
+checksum = "d82565c91fd627922ebfe2810ee4e8346841b6f9361b87505a9acea38b614fee"
 dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
  "chrono",
  "half 2.2.1",
  "indexmap 2.0.0",
@@ -529,15 +531,15 @@ dependencies = [
 
 [[package]]
 name = "arrow-ord"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c68549a4284d9f8b39586afb8d5ff8158b8f0286353a4844deb1d11cf1ba1f26"
+checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4"
 dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
  "half 2.2.1",
  "num",
 ]
@@ -559,15 +561,15 @@ dependencies = [
 
 [[package]]
 name = "arrow-row"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0a75a4a757afc301ce010adadff54d79d66140c4282ed3de565f6ccb716a5cf3"
+checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a"
 dependencies = [
  "ahash 0.8.3",
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
  "half 2.2.1",
  "hashbrown 0.14.0",
 ]
@@ -580,9 +582,9 @@ checksum = 
"bc85923d8d6662cc66ac6602c7d1876872e671002d60993dfdf492a6badeae92"
 
 [[package]]
 name = "arrow-schema"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2bebcb57eef570b15afbcf2d07d813eb476fde9f6dd69c81004d6476c197e87e"
+checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167"
 
 [[package]]
 name = "arrow-select"
@@ -599,14 +601,15 @@ dependencies = [
 
 [[package]]
 name = "arrow-select"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f6e2943fa433a48921e914417173816af64eef61c0a3d448280e6c40a62df221"
+checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036"
 dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
+ "ahash 0.8.3",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
  "num",
 ]
 
@@ -627,37 +630,37 @@ dependencies = [
 
 [[package]]
 name = "arrow-string"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "bbc92ed638851774f6d7af1ad900b92bc1486746497511868b4298fcbcfa35af"
+checksum = "a4a48149c63c11c9ff571e50ab8f017d2a7cb71037a882b42f6354ed2da9acc7"
 dependencies = [
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-data 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
  "num",
  "regex",
- "regex-syntax 0.7.1",
+ "regex-syntax 0.8.2",
 ]
 
 [[package]]
 name = "arrow_ext"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "serde",
  "snafu 0.6.10",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
 name = "arrow_util"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
  "ahash 0.8.3",
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "chrono",
  "comfy-table 6.1.4",
  "hashbrown 0.13.2",
@@ -682,8 +685,8 @@ dependencies = [
  "pin-project-lite",
  "tokio",
  "xz2",
- "zstd",
- "zstd-safe",
+ "zstd 0.12.3+zstd.1.5.2",
+ "zstd-safe 6.0.4+zstd.1.5.4",
 ]
 
 [[package]]
@@ -750,9 +753,9 @@ dependencies = [
 
 [[package]]
 name = "async-trait"
-version = "0.1.72"
+version = "0.1.77"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09"
+checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -881,7 +884,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "analytic_engine",
  "arena",
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "base64 0.13.1",
  "bytes_ext",
  "clap 3.2.23",
@@ -908,7 +911,7 @@ dependencies = [
  "toml_ext",
  "trace_metric",
  "wal",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
@@ -1452,7 +1455,7 @@ dependencies = [
  "logger",
  "macros",
  "meta_client",
- "prost",
+ "prost 0.11.8",
  "runtime",
  "serde",
  "serde_json",
@@ -1519,7 +1522,7 @@ dependencies = [
 name = "common_types"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "arrow_ext",
  "bytes_ext",
  "chrono",
@@ -1528,7 +1531,7 @@ dependencies = [
  "horaedbproto 2.0.0",
  "macros",
  "paste 1.0.12",
- "prost",
+ "prost 0.11.8",
  "rand 0.7.3",
  "seahash",
  "serde",
@@ -1565,7 +1568,7 @@ version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
 dependencies = [
- "prost",
+ "prost 0.11.8",
  "prost-types",
  "tonic 0.9.2",
  "tracing-core",
@@ -2003,13 +2006,13 @@ dependencies = [
 
 [[package]]
 name = "datafusion"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
  "ahash 0.8.3",
- "arrow 43.0.0",
- "arrow-array 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-schema 49.0.0",
  "async-compression",
  "async-trait",
  "bytes",
@@ -2021,24 +2024,22 @@ dependencies = [
  "datafusion-expr",
  "datafusion-optimizer",
  "datafusion-physical-expr",
- "datafusion-row",
+ "datafusion-physical-plan",
  "datafusion-sql",
  "flate2",
  "futures 0.3.28",
  "glob",
+ "half 2.2.1",
  "hashbrown 0.14.0",
  "indexmap 2.0.0",
- "itertools 0.11.0",
- "lazy_static",
+ "itertools 0.12.0",
  "log",
  "num_cpus",
- "object_store 0.6.1",
+ "object_store 0.8.0",
  "parking_lot 0.12.1",
  "parquet",
- "percent-encoding",
  "pin-project-lite",
  "rand 0.8.5",
- "smallvec",
  "sqlparser",
  "tempfile",
  "tokio",
@@ -2046,34 +2047,42 @@ dependencies = [
  "url",
  "uuid",
  "xz2",
- "zstd",
+ "zstd 0.13.0",
 ]
 
 [[package]]
 name = "datafusion-common"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
- "arrow 43.0.0",
- "arrow-array 43.0.0",
+ "ahash 0.8.3",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-schema 49.0.0",
  "chrono",
+ "half 2.2.1",
+ "libc",
  "num_cpus",
- "object_store 0.6.1",
+ "object_store 0.8.0",
  "parquet",
  "sqlparser",
 ]
 
 [[package]]
 name = "datafusion-execution"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
+ "arrow 49.0.0",
+ "chrono",
  "dashmap 5.4.0",
  "datafusion-common",
  "datafusion-expr",
+ "futures 0.3.28",
  "hashbrown 0.14.0",
  "log",
- "object_store 0.6.1",
+ "object_store 0.8.0",
  "parking_lot 0.12.1",
  "rand 0.8.5",
  "tempfile",
@@ -2082,13 +2091,14 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
  "ahash 0.8.3",
- "arrow 43.0.0",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
  "datafusion-common",
- "lazy_static",
+ "paste 1.0.12",
  "sqlparser",
  "strum 0.25.0",
  "strum_macros 0.25.1",
@@ -2096,45 +2106,43 @@ dependencies = [
 
 [[package]]
 name = "datafusion-optimizer"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "async-trait",
  "chrono",
  "datafusion-common",
  "datafusion-expr",
  "datafusion-physical-expr",
  "hashbrown 0.14.0",
- "itertools 0.11.0",
+ "itertools 0.12.0",
  "log",
- "regex-syntax 0.7.1",
+ "regex-syntax 0.8.2",
 ]
 
 [[package]]
 name = "datafusion-physical-expr"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
  "ahash 0.8.3",
- "arrow 43.0.0",
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-ord 49.0.0",
+ "arrow-schema 49.0.0",
  "base64 0.21.0",
  "blake2",
  "blake3",
  "chrono",
  "datafusion-common",
  "datafusion-expr",
- "datafusion-row",
  "half 2.2.1",
  "hashbrown 0.14.0",
  "hex",
  "indexmap 2.0.0",
- "itertools 0.11.0",
- "lazy_static",
- "libc",
+ "itertools 0.12.0",
  "log",
  "md-5",
  "paste 1.0.12",
@@ -2147,37 +2155,56 @@ dependencies = [
 ]
 
 [[package]]
-name = "datafusion-proto"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+name = "datafusion-physical-plan"
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
- "arrow 43.0.0",
+ "ahash 0.8.3",
+ "arrow 49.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-schema 49.0.0",
+ "async-trait",
  "chrono",
- "datafusion",
  "datafusion-common",
+ "datafusion-execution",
  "datafusion-expr",
- "object_store 0.6.1",
- "prost",
+ "datafusion-physical-expr",
+ "futures 0.3.28",
+ "half 2.2.1",
+ "hashbrown 0.14.0",
+ "indexmap 2.0.0",
+ "itertools 0.12.0",
+ "log",
+ "once_cell",
+ "parking_lot 0.12.1",
+ "pin-project-lite",
+ "rand 0.8.5",
+ "tokio",
+ "uuid",
 ]
 
 [[package]]
-name = "datafusion-row"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+name = "datafusion-proto"
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
+ "chrono",
+ "datafusion",
  "datafusion-common",
- "paste 1.0.12",
- "rand 0.8.5",
+ "datafusion-expr",
+ "object_store 0.8.0",
+ "prost 0.12.3",
 ]
 
 [[package]]
 name = "datafusion-sql"
-version = "27.0.0"
-source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d";
+version = "33.0.0"
+source = 
"git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1";
 dependencies = [
- "arrow 43.0.0",
- "arrow-schema 43.0.0",
+ "arrow 49.0.0",
+ "arrow-schema 49.0.0",
  "datafusion-common",
  "datafusion-expr",
  "log",
@@ -2187,7 +2214,7 @@ dependencies = [
 [[package]]
 name = "datafusion_util"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
  "async-trait",
  "datafusion",
@@ -2305,7 +2332,7 @@ dependencies = [
 name = "df_engine_extensions"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "async-recursion",
  "async-trait",
  "catalog",
@@ -2318,7 +2345,7 @@ dependencies = [
  "insta",
  "lazy_static",
  "prometheus 0.12.0",
- "prost",
+ "prost 0.11.8",
  "runtime",
  "snafu 0.6.10",
  "table_engine",
@@ -2330,7 +2357,7 @@ dependencies = [
 name = "df_operator"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "base64 0.13.1",
  "bincode",
  "chrono",
@@ -2470,7 +2497,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58"
 dependencies = [
  "http",
- "prost",
+ "prost 0.11.8",
  "tokio",
  "tokio-stream",
  "tonic 0.8.3",
@@ -2808,12 +2835,12 @@ checksum = 
"8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
 [[package]]
 name = "generated_types"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
  "pbjson",
  "pbjson-build",
  "pbjson-types",
- "prost",
+ "prost 0.11.8",
  "prost-build",
  "serde",
  "tonic-build",
@@ -3071,7 +3098,7 @@ dependencies = [
  "thiserror",
  "tokio",
  "tonic 0.8.3",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
@@ -3095,7 +3122,7 @@ version = "1.0.24"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "5907c770ee20818978cf2050341ca2c4c7fb7888423ccb090cbb2fda250dfad7"
 dependencies = [
- "prost",
+ "prost 0.11.8",
  "protoc-bin-vendored",
  "tonic 0.8.3",
  "tonic-build",
@@ -3107,7 +3134,7 @@ name = "horaedbproto"
 version = "2.0.0"
 source = 
"git+https://github.com/apache/incubator-horaedb-proto.git?rev=19ece8f771fc0b3e8e734072cc3d8040de6c74cb#19ece8f771fc0b3e8e734072cc3d8040de6c74cb";
 dependencies = [
- "prost",
+ "prost 0.11.8",
  "protoc-bin-vendored",
  "tonic 0.8.3",
  "tonic-build",
@@ -3325,7 +3352,7 @@ dependencies = [
 [[package]]
 name = "influxdb_influxql_parser"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
  "chrono",
  "chrono-tz",
@@ -3367,7 +3394,7 @@ name = "interpreters"
 version = "1.2.6-alpha"
 dependencies = [
  "analytic_engine",
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "async-trait",
  "catalog",
  "catalog_impls",
@@ -3418,9 +3445,9 @@ dependencies = [
 [[package]]
 name = "iox_query"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "arrow_util",
  "async-trait",
  "chrono",
@@ -3442,9 +3469,9 @@ dependencies = [
 [[package]]
 name = "iox_query_influxql"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "chrono",
  "chrono-tz",
  "datafusion",
@@ -3497,6 +3524,15 @@ dependencies = [
  "either",
 ]
 
+[[package]]
+name = "itertools"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0"
+dependencies = [
+ "either",
+]
+
 [[package]]
 name = "itoa"
 version = "1.0.6"
@@ -3953,7 +3989,7 @@ dependencies = [
  "horaedbproto 2.0.0",
  "logger",
  "macros",
- "prost",
+ "prost 0.11.8",
  "reqwest",
  "serde",
  "serde_json",
@@ -4314,9 +4350,9 @@ dependencies = [
 
 [[package]]
 name = "num"
-version = "0.4.0"
+version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606"
+checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af"
 dependencies = [
  "num-bigint",
  "num-complex",
@@ -4456,16 +4492,16 @@ dependencies = [
 
 [[package]]
 name = "object_store"
-version = "0.6.1"
+version = "0.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "27c776db4f332b571958444982ff641d2531417a326ca368995073b639205d58"
+checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050"
 dependencies = [
  "async-trait",
  "bytes",
  "chrono",
  "futures 0.3.28",
  "humantime 2.1.0",
- "itertools 0.10.5",
+ "itertools 0.11.0",
  "parking_lot 0.12.1",
  "percent-encoding",
  "snafu 0.7.4",
@@ -4497,7 +4533,7 @@ dependencies = [
  "partitioned_lock",
  "prometheus 0.12.0",
  "prometheus-static-metric",
- "prost",
+ "prost 0.11.8",
  "rand 0.7.3",
  "runtime",
  "serde",
@@ -4545,13 +4581,13 @@ dependencies = [
  "tokio",
  "tokio-util",
  "uuid",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
 name = "observability_deps"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
  "tracing",
 ]
@@ -4675,18 +4711,18 @@ dependencies = [
 
 [[package]]
 name = "parquet"
-version = "43.0.0"
+version = "49.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ec7267a9607c3f955d4d0ac41b88a67cecc0d8d009173ad3da390699a6cb3750"
+checksum = "af88740a842787da39b3d69ce5fbf6fce97d20211d3b299fee0a0da6430c74d4"
 dependencies = [
  "ahash 0.8.3",
- "arrow-array 43.0.0",
- "arrow-buffer 43.0.0",
- "arrow-cast 43.0.0",
- "arrow-data 43.0.0",
- "arrow-ipc 43.0.0",
- "arrow-schema 43.0.0",
- "arrow-select 43.0.0",
+ "arrow-array 49.0.0",
+ "arrow-buffer 49.0.0",
+ "arrow-cast 49.0.0",
+ "arrow-data 49.0.0",
+ "arrow-ipc 49.0.0",
+ "arrow-schema 49.0.0",
+ "arrow-select 49.0.0",
  "base64 0.21.0",
  "brotli",
  "bytes",
@@ -4694,24 +4730,24 @@ dependencies = [
  "flate2",
  "futures 0.3.28",
  "hashbrown 0.14.0",
- "lz4",
+ "lz4_flex",
  "num",
  "num-bigint",
- "object_store 0.6.1",
+ "object_store 0.8.0",
  "paste 1.0.12",
  "seq-macro",
  "snap",
  "thrift",
  "tokio",
  "twox-hash",
- "zstd",
+ "zstd 0.13.0",
 ]
 
 [[package]]
 name = "parquet_ext"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "arrow_ext",
  "async-trait",
  "bytes",
@@ -4738,7 +4774,7 @@ name = "partition_table_engine"
 version = "1.2.6-alpha"
 dependencies = [
  "analytic_engine",
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "async-trait",
  "common_types",
  "datafusion",
@@ -4805,7 +4841,7 @@ checksum = 
"bdbb7b706f2afc610f3853550cdbbf6372fd324824a087806bd4480ea4996e24"
 dependencies = [
  "heck",
  "itertools 0.10.5",
- "prost",
+ "prost 0.11.8",
  "prost-types",
 ]
 
@@ -4819,7 +4855,7 @@ dependencies = [
  "chrono",
  "pbjson",
  "pbjson-build",
- "prost",
+ "prost 0.11.8",
  "prost-build",
  "serde",
 ]
@@ -5179,7 +5215,7 @@ dependencies = [
  "async-trait",
  "bytes",
  "futures 0.3.28",
- "prost",
+ "prost 0.11.8",
  "prost-build",
  "snap",
  "warp",
@@ -5256,7 +5292,17 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537"
 dependencies = [
  "bytes",
- "prost-derive",
+ "prost-derive 0.11.8",
+]
+
+[[package]]
+name = "prost"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a"
+dependencies = [
+ "bytes",
+ "prost-derive 0.12.3",
 ]
 
 [[package]]
@@ -5273,7 +5319,7 @@ dependencies = [
  "multimap",
  "petgraph",
  "prettyplease 0.1.25",
- "prost",
+ "prost 0.11.8",
  "prost-types",
  "regex",
  "syn 1.0.109",
@@ -5294,13 +5340,26 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "prost-derive"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e"
+dependencies = [
+ "anyhow",
+ "itertools 0.11.0",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.48",
+]
+
 [[package]]
 name = "prost-types"
 version = "0.11.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88"
 dependencies = [
- "prost",
+ "prost 0.11.8",
 ]
 
 [[package]]
@@ -5363,7 +5422,7 @@ checksum = 
"9653c3ed92974e34c5a6e0a510864dab979760481714c172e0a34e437cb98804"
 name = "proxy"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "arrow_ext",
  "async-trait",
  "bytes",
@@ -5391,7 +5450,7 @@ dependencies = [
  "prom-remote-api",
  "prometheus 0.12.0",
  "prometheus-static-metric",
- "prost",
+ "prost 0.11.8",
  "query_engine",
  "query_frontend",
  "router",
@@ -5409,7 +5468,7 @@ dependencies = [
  "tokio-stream",
  "tonic 0.8.3",
  "warp",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
@@ -5463,7 +5522,7 @@ dependencies = [
 name = "query_engine"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "async-trait",
  "bytes_ext",
  "catalog",
@@ -5478,7 +5537,7 @@ dependencies = [
  "iox_query",
  "logger",
  "macros",
- "prost",
+ "prost 0.11.8",
  "query_frontend",
  "runtime",
  "serde",
@@ -5493,7 +5552,7 @@ dependencies = [
 name = "query_frontend"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "async-trait",
  "catalog",
  "chrono",
@@ -5529,9 +5588,9 @@ dependencies = [
 [[package]]
 name = "query_functions"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "chrono",
  "datafusion",
  "itertools 0.10.5",
@@ -5802,6 +5861,12 @@ version = "0.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c"
 
+[[package]]
+name = "regex-syntax"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
+
 [[package]]
 name = "remote_engine_client"
 version = "1.2.6-alpha"
@@ -6227,9 +6292,9 @@ dependencies = [
 [[package]]
 name = "schema"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "hashbrown 0.13.2",
  "indexmap 1.9.3",
  "itertools 0.10.5",
@@ -6353,7 +6418,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "analytic_engine",
  "arc-swap 1.6.0",
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "arrow_ext",
  "async-trait",
  "bytes_ext",
@@ -6386,7 +6451,7 @@ dependencies = [
  "prom-remote-api",
  "prometheus 0.12.0",
  "prometheus-static-metric",
- "prost",
+ "prost 0.11.8",
  "proxy",
  "query_engine",
  "query_frontend",
@@ -6407,7 +6472,7 @@ dependencies = [
  "tonic 0.8.3",
  "wal",
  "warp",
- "zstd",
+ "zstd 0.12.3+zstd.1.5.2",
 ]
 
 [[package]]
@@ -6717,9 +6782,9 @@ dependencies = [
 
 [[package]]
 name = "sqlparser"
-version = "0.35.0"
+version = "0.39.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ca597d77c98894be1f965f2e4e2d2a61575d4998088e655476c73715c54b2b43"
+checksum = "743b4dc2cbde11890ccb254a8fc9d537fa41b36da00de2a1c5e9848c9bc42bd7"
 dependencies = [
  "log",
  "serde",
@@ -6897,7 +6962,7 @@ dependencies = [
 name = "system_catalog"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "async-trait",
  "bytes_ext",
  "catalog",
@@ -6908,7 +6973,7 @@ dependencies = [
  "horaedbproto 2.0.0",
  "logger",
  "macros",
- "prost",
+ "prost 0.11.8",
  "snafu 0.6.10",
  "table_engine",
  "tokio",
@@ -6927,7 +6992,7 @@ dependencies = [
 name = "table_engine"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "arrow_ext",
  "async-trait",
  "bytes_ext",
@@ -6943,7 +7008,7 @@ dependencies = [
  "lazy_static",
  "logger",
  "macros",
- "prost",
+ "prost 0.11.8",
  "rand 0.7.3",
  "regex",
  "runtime",
@@ -7024,7 +7089,7 @@ dependencies = [
 [[package]]
 name = "test_helpers"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08";
 dependencies = [
  "dotenvy",
  "observability_deps",
@@ -7038,7 +7103,7 @@ dependencies = [
 name = "test_util"
 version = "1.2.6-alpha"
 dependencies = [
- "arrow 43.0.0",
+ "arrow 49.0.0",
  "chrono",
  "common_types",
  "env_logger",
@@ -7375,8 +7440,8 @@ dependencies = [
  "hyper-timeout",
  "percent-encoding",
  "pin-project",
- "prost",
- "prost-derive",
+ "prost 0.11.8",
+ "prost-derive 0.11.8",
  "rustls-pemfile 1.0.2",
  "tokio",
  "tokio-rustls 0.23.4",
@@ -7408,7 +7473,7 @@ dependencies = [
  "hyper-timeout",
  "percent-encoding",
  "pin-project",
- "prost",
+ "prost 0.11.8",
  "tokio",
  "tokio-stream",
  "tower",
@@ -7647,7 +7712,7 @@ version = "1.6.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
 dependencies = [
- "cfg-if 0.1.10",
+ "cfg-if 1.0.0",
  "rand 0.8.5",
  "static_assertions",
 ]
@@ -7804,7 +7869,7 @@ dependencies = [
  "macros",
  "message_queue",
  "prometheus 0.12.0",
- "prost",
+ "prost 0.11.8",
  "rand 0.8.5",
  "rocksdb",
  "runtime",
@@ -8433,7 +8498,16 @@ version = "0.12.3+zstd.1.5.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806"
 dependencies = [
- "zstd-safe",
+ "zstd-safe 6.0.4+zstd.1.5.4",
+]
+
+[[package]]
+name = "zstd"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110"
+dependencies = [
+ "zstd-safe 7.0.0",
 ]
 
 [[package]]
@@ -8446,6 +8520,15 @@ dependencies = [
  "zstd-sys",
 ]
 
+[[package]]
+name = "zstd-safe"
+version = "7.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e"
+dependencies = [
+ "zstd-sys",
+]
+
 [[package]]
 name = "zstd-sys"
 version = "2.0.7+zstd.1.5.4"
diff --git a/Cargo.toml b/Cargo.toml
index d195a121..b41694b3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -85,8 +85,8 @@ members = [
 
 [workspace.dependencies]
 alloc_tracker = { path = "src/components/alloc_tracker" }
-arrow = { version = "43.0.0", features = ["prettyprint"] }
-arrow_ipc = { version = "43.0.0" }
+arrow = { version = "49.0.0", features = ["prettyprint"] }
+arrow_ipc = { version = "49.0.0" }
 arrow_ext = { path = "src/components/arrow_ext" }
 analytic_engine = { path = "src/analytic_engine" }
 arena = { path = "src/components/arena" }
@@ -107,8 +107,8 @@ cluster = { path = "src/cluster" }
 criterion = "0.5"
 horaedb-client = "1.0.2"
 common_types = { path = "src/common_types" }
-datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git";, rev = 
"9c3a537e25e5ab3299922864034f67fb2f79805d" }
-datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git";, 
rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
+datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git";, rev = 
"e21b03154" }
+datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git";, 
rev = "e21b03154" }
 derive_builder = "0.12"
 df_operator = { path = "src/df_operator" }
 df_engine_extensions = { path = "src/df_engine_extensions" }
@@ -121,10 +121,10 @@ hash_ext = { path = "src/components/hash_ext" }
 hex = "0.4.3"
 hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git";, rev 
= "425487ce910f26636fbde8c4d640b538431aad50" }
 id_allocator = { path = "src/components/id_allocator" }
-influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git";, 
rev = "a905863", package = "iox_query_influxql" }
-influxql-parser = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"a905863", package = "influxdb_influxql_parser" }
-influxql-query = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"a905863", package = "iox_query" }
-influxql-schema = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"a905863", package = "schema" }
+influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git";, 
rev = "b9fb3ca", package = "iox_query_influxql" }
+influxql-parser = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"b9fb3ca", package = "influxdb_influxql_parser" }
+influxql-query = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"b9fb3ca", package = "iox_query" }
+influxql-schema = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"b9fb3ca", package = "schema" }
 interpreters = { path = "src/interpreters" }
 itertools = "0.10.5"
 lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
@@ -142,7 +142,7 @@ panic_ext = { path = "src/components/panic_ext" }
 partitioned_lock = { path = "src/components/partitioned_lock" }
 partition_table_engine = { path = "src/partition_table_engine" }
 parquet_ext = { path = "src/components/parquet_ext" }
-parquet = { version = "43.0.0" }
+parquet = { version = "49.0.0" }
 paste = "1.0"
 pin-project-lite = "0.2.8"
 pprof = "0.12.1"
@@ -172,9 +172,9 @@ size_ext = { path = "src/components/size_ext" }
 smallvec = "1.6"
 slog = "2.7"
 spin = "0.9.6"
-sqlparser = { version = "0.35", features = ["serde"] }
-system_catalog = { path = "src/system_catalog" }
 system_statis = { path = "src/components/system_stats" }
+sqlparser = { version = "0.39.0", features = ["serde"] }
+system_catalog = { path = "src/system_catalog" }
 table_engine = { path = "src/table_engine" }
 table_kv = { path = "src/components/table_kv" }
 tempfile = "3.1.0"
diff --git a/integration_tests/cases/common/dml/issue-1087.result 
b/integration_tests/cases/common/dml/issue-1087.result
index d264f4d2..fc1e0d8d 100644
--- a/integration_tests/cases/common/dml/issue-1087.result
+++ b/integration_tests/cases/common/dml/issue-1087.result
@@ -17,6 +17,7 @@ String("logical_plan after inline_table_scan"),String("SAME 
TEXT AS ABOVE"),
 String("logical_plan after type_coercion"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after count_wildcard_rule"),String("SAME TEXT AS ABOVE"),
 String("analyzed_logical_plan"),String("SAME TEXT AS ABOVE"),
+String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS 
ABOVE"),
 String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS 
ABOVE"),
 String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS 
ABOVE"),
@@ -33,6 +34,7 @@ String("logical_plan after 
eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT 
AS ABOVE"),
 String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS 
ABOVE"),
+String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS 
ABOVE"),
 String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
@@ -46,6 +48,7 @@ String("logical_plan after 
eliminate_projection"),String("TableScan: issue_1087
 String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME 
TEXT AS ABOVE"),
 String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
+String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS 
ABOVE"),
 String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS 
ABOVE"),
 String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS 
ABOVE"),
@@ -62,6 +65,7 @@ String("logical_plan after 
eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT 
AS ABOVE"),
 String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS 
ABOVE"),
+String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS 
ABOVE"),
 String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
 String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
@@ -76,17 +80,22 @@ String("logical_plan after push_down_limit"),String("SAME 
TEXT AS ABOVE"),
 String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME 
TEXT AS ABOVE"),
 String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
 String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t, 
name, value]"),
-String("initial_physical_plan"),String("ScanTable: table=issue_1087, 
parallelism=8, priority=Low\n"),
+String("initial_physical_plan"),String("ScanTable: table=issue_1087, 
parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
+String("initial_physical_plan_with_stats"),String("ScanTable: 
table=issue_1087, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), statistics=[Rows=Absent, Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"),
+String("physical_plan after 
OutputRequirements"),String("OutputRequirementExec\n  ScanTable: 
table=issue_1087, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8)\n"),
 String("physical_plan after aggregate_statistics"),String("SAME TEXT AS 
ABOVE"),
 String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"),
-String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"),
-String("physical_plan after repartition"),String("SAME TEXT AS ABOVE"),
+String("physical_plan after LimitedDistinctAggregation"),String("SAME TEXT AS 
ABOVE"),
 String("physical_plan after EnforceDistribution"),String("SAME TEXT AS ABOVE"),
 String("physical_plan after CombinePartialFinalAggregate"),String("SAME TEXT 
AS ABOVE"),
 String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"),
 String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"),
+String("physical_plan after OutputRequirements"),String("ScanTable: 
table=issue_1087, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8)\n"),
 String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"),
-String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, 
priority=Low\n"),
+String("physical_plan after LimitAggregation"),String("SAME TEXT AS ABOVE"),
+String("physical_plan after ProjectionPushdown"),String("SAME TEXT AS ABOVE"),
+String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8)\n"),
+String("physical_plan_with_stats"),String("ScanTable: table=issue_1087, 
parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), 
statistics=[Rows=Absent, Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"),
 
 
 DROP TABLE `issue_1087`;
diff --git a/integration_tests/cases/common/dml/issue-302.result 
b/integration_tests/cases/common/dml/issue-302.result
index b57d881f..cd7afc3a 100644
--- a/integration_tests/cases/common/dml/issue-302.result
+++ b/integration_tests/cases/common/dml/issue-302.result
@@ -12,7 +12,7 @@ affected_rows: 1
 
 select `t`, count(distinct name) from issue302 group by `t`;
 
-issue302.t,COUNT(DISTINCT issue302.name),
+t,COUNT(DISTINCT issue302.name),
 Timestamp(1651737067000),Int64(0),
 
 
diff --git a/integration_tests/cases/common/dml/issue-341.result 
b/integration_tests/cases/common/dml/issue-341.result
index 90222259..4d7da95c 100644
--- a/integration_tests/cases/common/dml/issue-341.result
+++ b/integration_tests/cases/common/dml/issue-341.result
@@ -58,7 +58,7 @@ WHERE
 
 plan_type,plan,
 String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, 
value], full_filters=[issue341_t1.value = Int32(3)]"),
-String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, 
priority=Low\n"),
+String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8)\n"),
 
 
 -- FilterExec node should not be in plan.
@@ -71,8 +71,8 @@ WHERE
     tag1 = "t3";
 
 plan_type,plan,
-String("logical_plan"),String("Projection: issue341_t1.timestamp, 
issue341_t1.value\n  TableScan: issue341_t1 projection=[timestamp, value, 
tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
-String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as 
timestamp, value@1 as value]\n  ScanTable: table=issue341_t1, parallelism=8, 
priority=Low\n"),
+String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, 
value], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
+String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as 
timestamp, value@1 as value]\n  ScanTable: table=issue341_t1, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8)\n"),
 
 
 -- Repeat operations above, but with overwrite table
@@ -116,7 +116,7 @@ WHERE
 
 plan_type,plan,
 String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n  
TableScan: issue341_t2 projection=[timestamp, value], 
partial_filters=[issue341_t2.value = Float64(3)]"),
-String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n  
FilterExec: value@1 = 3\n    ScanTable: table=issue341_t2, parallelism=8, 
priority=Low\n"),
+String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n  
FilterExec: value@1 = 3\n    ScanTable: table=issue341_t2, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8)\n"),
 
 
 -- When using tag as filter, FilterExec node should not be in plan.
@@ -129,8 +129,8 @@ WHERE
     tag1 = "t3";
 
 plan_type,plan,
-String("logical_plan"),String("Projection: issue341_t2.timestamp, 
issue341_t2.value\n  TableScan: issue341_t2 projection=[timestamp, value, 
tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
-String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as 
timestamp, value@1 as value]\n  ScanTable: table=issue341_t2, parallelism=8, 
priority=Low\n"),
+String("logical_plan"),String("TableScan: issue341_t2 projection=[timestamp, 
value], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
+String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as 
timestamp, value@1 as value]\n  ScanTable: table=issue341_t2, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8)\n"),
 
 
 DROP TABLE IF EXISTS `issue341_t1`;
diff --git a/integration_tests/cases/common/dml/issue-59.result 
b/integration_tests/cases/common/dml/issue-59.result
index 549c7019..4f7544c8 100644
--- a/integration_tests/cases/common/dml/issue-59.result
+++ b/integration_tests/cases/common/dml/issue-59.result
@@ -24,8 +24,8 @@ FROM issue59
 GROUP BY id+1;
 
 plan_type,plan,
-String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + 
Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n  Aggregate: 
groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n    Projection: 
group_alias_0, alias1\n      Aggregate: groupBy=[[CAST(issue59.id AS Int64) + 
Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n        
TableScan: issue59 projection=[id, account]"),
-String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as 
issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n  
AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], 
aggr=[COUNT(alias1)]\n    CoalesceBatchesExec: target_batch_size=8192\n      
RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n  
      AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], 
aggr=[COUNT(alias1)]\n          [...]
+String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + 
Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n  Aggregate: 
groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n    Aggregate: 
groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, 
issue59.account AS alias1]], aggr=[[]]\n      TableScan: issue59 
projection=[id, account]"),
+String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as 
issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n  
AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], 
aggr=[COUNT(alias1)]\n    CoalesceBatchesExec: target_batch_size=8192\n      
RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n  
      AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], 
aggr=[COUNT(alias1)]\n          [...]
 
 
 DROP TABLE IF EXISTS issue59;
diff --git a/integration_tests/cases/common/explain/explain.result 
b/integration_tests/cases/common/explain/explain.result
index 0cd06380..6cf09c07 100644
--- a/integration_tests/cases/common/explain/explain.result
+++ b/integration_tests/cases/common/explain/explain.result
@@ -10,7 +10,7 @@ EXPLAIN SELECT t FROM `04_explain_t`;
 
 plan_type,plan,
 String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"),
-String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, 
priority=Low\n"),
+String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8)\n"),
 
 
 DROP TABLE `04_explain_t`;
diff --git a/integration_tests/cases/common/function/aggregate.result 
b/integration_tests/cases/common/function/aggregate.result
index 037e503a..f45a6841 100644
--- a/integration_tests/cases/common/function/aggregate.result
+++ b/integration_tests/cases/common/function/aggregate.result
@@ -105,7 +105,50 @@ COUNT(DISTINCT 02_function_aggregate_table1.arch),
 Int64(2),
 
 
+CREATE TABLE `02_function_aggregate_table2` (
+    `timestamp` timestamp NOT NULL,
+    `arch` string TAG,
+    `datacenter` string TAG,
+    `value` int,
+    `uvalue` uint64,
+    timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+        enable_ttl='false',
+     update_mode = 'append'
+);
+
+affected_rows: 0
+
+INSERT INTO `02_function_aggregate_table2`
+    (`timestamp`, `arch`, `datacenter`, `value`, `uvalue`)
+VALUES
+    (1658304762, 'x86-64', 'china', 100, 10),
+    (1658304763, 'x86-64', 'china', 200, 10),
+    (1658304762, 'arm64', 'china', 110, 0),
+    (1658304763, 'arm64', 'china', 210, 0);
+
+affected_rows: 4
+
+-- The should select empty column
+SELECT count(*) FROM `02_function_aggregate_table1`;
+
+COUNT(*),
+Int64(4),
+
+
+-- Same with before, but query from sst
+-- SQLNESS ARG pre_cmd=flush
+SELECT count(*) FROM `02_function_aggregate_table1`;
+
+COUNT(*),
+Int64(4),
+
+
 DROP TABLE `02_function_aggregate_table1`;
 
 affected_rows: 0
 
+DROP TABLE `02_function_aggregate_table2`;
+
+affected_rows: 0
+
diff --git a/integration_tests/cases/common/function/aggregate.sql 
b/integration_tests/cases/common/function/aggregate.sql
index c4f8dd50..8543245a 100644
--- a/integration_tests/cases/common/function/aggregate.sql
+++ b/integration_tests/cases/common/function/aggregate.sql
@@ -57,4 +57,32 @@ SELECT distinct(`arch`) FROM `02_function_aggregate_table1` 
ORDER BY `arch` DESC
 
 SELECT count(distinct(`arch`)) FROM `02_function_aggregate_table1`;
 
+CREATE TABLE `02_function_aggregate_table2` (
+    `timestamp` timestamp NOT NULL,
+    `arch` string TAG,
+    `datacenter` string TAG,
+    `value` int,
+    `uvalue` uint64,
+    timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+        enable_ttl='false',
+     update_mode = 'append'
+);
+
+INSERT INTO `02_function_aggregate_table2`
+    (`timestamp`, `arch`, `datacenter`, `value`, `uvalue`)
+VALUES
+    (1658304762, 'x86-64', 'china', 100, 10),
+    (1658304763, 'x86-64', 'china', 200, 10),
+    (1658304762, 'arm64', 'china', 110, 0),
+    (1658304763, 'arm64', 'china', 210, 0);
+
+-- The should select empty column
+SELECT count(*) FROM `02_function_aggregate_table1`;
+
+-- Same with before, but query from sst
+-- SQLNESS ARG pre_cmd=flush
+SELECT count(*) FROM `02_function_aggregate_table1`;
+
 DROP TABLE `02_function_aggregate_table1`;
+DROP TABLE `02_function_aggregate_table2`;
diff --git a/integration_tests/cases/common/optimizer/optimizer.result 
b/integration_tests/cases/common/optimizer/optimizer.result
index f9cfac2d..5df9f47e 100644
--- a/integration_tests/cases/common/optimizer/optimizer.result
+++ b/integration_tests/cases/common/optimizer/optimizer.result
@@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM 
`07_optimizer_t` GROUP BY
 
 plan_type,plan,
 String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, 
AVG(07_optimizer_t.value) AS c2\n  Aggregate: groupBy=[[07_optimizer_t.name]], 
aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n    TableScan: 
07_optimizer_t projection=[name, value]"),
-String("physical_plan"),String("ProjectionExec: 
expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n  
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], 
aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n    
CoalesceBatchesExec: target_batch_size=8192\n      RepartitionExec: 
partitioning=Hash([name@0], 8), input_partitions=8\n        AggregateExec: 
mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), 
AVG(07_optimizer_t.value)]\n [...]
+String("physical_plan"),String("ProjectionExec: 
expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n  
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], 
aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n    
CoalesceBatchesExec: target_batch_size=8192\n      RepartitionExec: 
partitioning=Hash([name@0], 8), input_partitions=8\n        AggregateExec: 
mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), 
AVG(07_optimizer_t.value)]\n [...]
 
 
 DROP TABLE `07_optimizer_t`;
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result 
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index d376718c..233c3483 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -80,19 +80,23 @@ 
UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0
 
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
 
 plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n    
__partition_table_t_1:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, 
metrics=[output_rows=0, elapsed_compute=xxs]\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, priority=Low, metrics=[\nPredicate 
{ exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange  [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=xx\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = 
Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: 
Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) 
} }\nscan_table:\n    do_merge_sort=true\n    iter_num=1\n    merge_iter_0:\n   
     in [...]
 
 
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
 -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
+-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
 
 plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n    __partition_table_t_x:\n        
poll_duration=xxs\n        total_duration=xxs\n        wait_duration=xxs\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_x:\n [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredica [...]
 
 
 ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql 
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index a36b59ac..a87dfbb2 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -37,11 +37,15 @@ SELECT * from partition_table_t where name in ("horaedb5", 
"horaedb6", "horaedb7
 
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
 
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
 -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
+-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
 
 ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/local/ddl/query-plan.result 
b/integration_tests/cases/env/local/ddl/query-plan.result
index a421856b..1f632184 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.result
+++ b/integration_tests/cases/env/local/ddl/query-plan.result
@@ -27,48 +27,53 @@ affected_rows: 3
 
 -- This query should include memtable
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348001000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348001001), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=true\n    iter_num=1\n    merge_iter_0:\n        
init_duration=xxs\n        num_memtables=1\n        num_ssts=0\n        
scan_count=2\n        scan_durat [...]
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > 
TimestampMillisecond(1695348001000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n        
num_memtables=1\n        num_ssts=0\n [...]
 
 
 -- This query should have higher priority
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_dml_select_real_time_range`
 where t >= 1695348001000 and t < 1695348002000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=High, 
metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t 
< TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n        
num_memtables=1\n        num_ssts= [...]
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=High, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= 
TimestampMillisecond(1695348001000, None), t < 
TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n   [...]
 
 
 -- This query should not include memtable
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348002000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348002001), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=true\n    iter_num=0\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > 
TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348002001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=0\n=0]\n"),
 
 
 -- SQLNESS ARG pre_cmd=flush
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 -- This query should include SST
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348001000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348001001), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=true\n    iter_num=1\n    merge_iter_0:\n        
init_duration=xxs\n        num_memtables=0\n        num_ssts=1\n        
scan_count=2\n        scan_durat [...]
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > 
TimestampMillisecond(1695348001000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n        
num_memtables=0\n        num_ssts=1\n [...]
 
 
 -- This query should not include SST
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348002000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348002001), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=true\n    iter_num=0\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > 
TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348002001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=0\n=0]\n"),
 
 
 -- Table with an 'append' update mode
@@ -97,11 +102,12 @@ affected_rows: 3
 -- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx
 -- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx
 -- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_append_mode_table`
 where t >= 1695348001000 and name = 'ceresdb';
 
 plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=[output_rows=2, elapsed_compute=xxs]\n  ScanTable: 
table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), name = 
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: 
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } 
}\nscan_table:\n    do_merge_sort=false\n    chain_iter_0:\n        
num_memtables=1\n    [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=xx\n  ScanTable: table=03_append_mode_table, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), name = 
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: 
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } 
}\nscan_table:\n    do_merge_sort=false\n    chain_iter_0:\n        
num_memtables= [...]
 
 
 -- Should just fetch projected columns from SST
@@ -111,11 +117,12 @@ String("Plan with Metrics"),String("ProjectionExec: 
expr=[t@0 as t], metrics=[ou
 -- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx
 -- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
 -- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_append_mode_table`
 where t >= 1695348001000 and name = 'ceresdb';
 
 plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=[output_rows=2, elapsed_compute=xxs]\n  ScanTable: 
table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), name = 
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: 
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } 
}\nscan_table:\n    do_merge_sort=false\n    chain_iter_0:\n        
num_memtables=0\n    [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=xx\n  ScanTable: table=03_append_mode_table, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), name = 
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: 
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } 
}\nscan_table:\n    do_merge_sort=false\n    chain_iter_0:\n        
num_memtables= [...]
 
 
 CREATE TABLE `TEST_QUERY_PRIORITY` (
@@ -132,20 +139,22 @@ affected_rows: 0
 
 -- This query should have higher priority
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select TS from `TEST_QUERY_PRIORITY`
 where TS >= 1695348001000 and TS < 1695348002000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, 
parallelism=8, priority=High, metrics=[\nPredicate { exprs:[TS >= 
TimestampMillisecond(1695348001000, None), TS < 
TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=false\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, 
parallelism=8, priority=High, partition_count=UnknownPartitioning(8), 
metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None), 
TS < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=false\n=0]\n"),
 
 
 -- This query should have higher priority
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select TS from `TEST_QUERY_PRIORITY`
 where TS >= 1695348001000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, 
parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[TS >= 
TimestampMillisecond(1695348001000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=false\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, 
parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), 
metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348001000), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=false\n=0]\n"),
 
 
 DROP TABLE `03_dml_select_real_time_range`;
diff --git a/integration_tests/cases/env/local/ddl/query-plan.sql 
b/integration_tests/cases/env/local/ddl/query-plan.sql
index 218e0f7b..5217b1a0 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.sql
+++ b/integration_tests/cases/env/local/ddl/query-plan.sql
@@ -18,27 +18,32 @@ INSERT INTO `03_dml_select_real_time_range` (t, name, value)
 
 -- This query should include memtable
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348001000;
 
 -- This query should have higher priority
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_dml_select_real_time_range`
 where t >= 1695348001000 and t < 1695348002000;
 
 -- This query should not include memtable
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348002000;
 
 -- SQLNESS ARG pre_cmd=flush
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 -- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 -- This query should include SST
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348001000;
 
 -- This query should not include SST
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348002000;
 
@@ -64,6 +69,7 @@ INSERT INTO `03_append_mode_table` (t, name, value)
 -- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx
 -- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx
 -- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_append_mode_table`
 where t >= 1695348001000 and name = 'ceresdb';
 
@@ -74,6 +80,7 @@ where t >= 1695348001000 and name = 'ceresdb';
 -- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx
 -- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
 -- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select t from `03_append_mode_table`
 where t >= 1695348001000 and name = 'ceresdb';
 
@@ -89,11 +96,13 @@ CREATE TABLE `TEST_QUERY_PRIORITY` (
 
 -- This query should have higher priority
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select TS from `TEST_QUERY_PRIORITY`
 where TS >= 1695348001000 and TS < 1695348002000;
 
 -- This query should have higher priority
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
 explain analyze select TS from `TEST_QUERY_PRIORITY`
 where TS >= 1695348001000;
 
diff --git a/src/analytic_engine/src/instance/reorder_memtable.rs 
b/src/analytic_engine/src/instance/reorder_memtable.rs
index e6eab4d1..c37417bf 100644
--- a/src/analytic_engine/src/instance/reorder_memtable.rs
+++ b/src/analytic_engine/src/instance/reorder_memtable.rs
@@ -147,8 +147,11 @@ impl ExecutionPlan for ScanMemIter {
         }))
     }
 
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
+    fn statistics(
+        &self,
+    ) -> std::result::Result<datafusion::common::Statistics, 
datafusion::error::DataFusionError>
+    {
+        Ok(Statistics::new_unknown(&self.schema()))
     }
 }
 
@@ -259,8 +262,8 @@ impl Reorder {
     pub async fn into_stream(self) -> 
Result<SendableFetchingRecordBatchStream> {
         // 1. Init datafusion context
         let runtime = Arc::new(RuntimeEnv::default());
-        let state = SessionState::with_config_rt(SessionConfig::new(), 
runtime);
-        let ctx = SessionContext::with_state(state);
+        let state = SessionState::new_with_config_rt(SessionConfig::new(), 
runtime);
+        let ctx = SessionContext::new_with_state(state);
         let table_provider = Arc::new(MemIterProvider {
             arrow_schema: self.schema.to_arrow_schema_ref(),
             iter: Mutex::new(Some(self.iter)),
diff --git a/src/analytic_engine/src/memtable/skiplist/iter.rs 
b/src/analytic_engine/src/memtable/skiplist/iter.rs
index 4787b754..cce3913d 100644
--- a/src/analytic_engine/src/memtable/skiplist/iter.rs
+++ b/src/analytic_engine/src/memtable/skiplist/iter.rs
@@ -154,6 +154,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
         assert!(self.batch_size > 0);
 
         let record_schema = self.row_projector.fetched_schema().clone();
+        let is_empty_projection = record_schema.columns().is_empty();
         let primary_key_indexes = self
             .row_projector
             .primary_key_indexes()
@@ -183,6 +184,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
             }
         }
 
+        if is_empty_projection {
+            builder.inc_row_num(num_rows);
+        }
+
         if num_rows > 0 {
             if let Some(deadline) = self.deadline {
                 let now = Instant::now();
diff --git a/src/analytic_engine/src/row_iter/record_batch_stream.rs 
b/src/analytic_engine/src/row_iter/record_batch_stream.rs
index 2a39c648..49c41f24 100644
--- a/src/analytic_engine/src/row_iter/record_batch_stream.rs
+++ b/src/analytic_engine/src/row_iter/record_batch_stream.rs
@@ -161,6 +161,7 @@ fn filter_record_batch(
     let filter_array = predicate
         .evaluate(record_batch)
         .map(|v| v.into_array(record_batch.num_rows()))
+        .context(FilterExec)?
         .context(FilterExec)?;
     let selected_rows = filter_array
         .as_any()
diff --git a/src/analytic_engine/src/table/mod.rs 
b/src/analytic_engine/src/table/mod.rs
index af381b5b..674f6b3b 100644
--- a/src/analytic_engine/src/table/mod.rs
+++ b/src/analytic_engine/src/table/mod.rs
@@ -430,6 +430,7 @@ pub fn support_pushdown(schema: &Schema, need_dedup: bool, 
col_names: &[String])
     }
 
     // When table need dedup, only unique keys columns support pushdown
+    // See https://github.com/apache/incubator-horaedb/issues/605
     col_names
         .iter()
         .all(|col_name| schema.is_unique_column(col_name.as_str()))
diff --git a/src/common_types/src/datum.rs b/src/common_types/src/datum.rs
index d152e960..9b22439a 100644
--- a/src/common_types/src/datum.rs
+++ b/src/common_types/src/datum.rs
@@ -292,9 +292,11 @@ impl TryFrom<&SqlDataType> for DatumKind {
             SqlDataType::Double => Ok(Self::Double),
             SqlDataType::Boolean => Ok(Self::Boolean),
             SqlDataType::BigInt(_) => Ok(Self::Int64),
+            SqlDataType::Int64 => Ok(Self::Int64),
             SqlDataType::Int(_) => Ok(Self::Int32),
+            SqlDataType::Int8(_) => Ok(Self::Int8),
             SqlDataType::SmallInt(_) => Ok(Self::Int16),
-            SqlDataType::String => Ok(Self::String),
+            SqlDataType::String(_) => Ok(Self::String),
             SqlDataType::Varbinary(_) => Ok(Self::Varbinary),
             SqlDataType::Date => Ok(Self::Date),
             SqlDataType::Time(_, _) => Ok(Self::Time),
@@ -1453,7 +1455,7 @@ impl Datum {
             ScalarValue::Date32(v) => v.map(Datum::Date),
             ScalarValue::Time64Nanosecond(v) => v.map(Datum::Time),
             ScalarValue::Dictionary(_, literal) => 
Datum::from_scalar_value(literal),
-            ScalarValue::List(_, _)
+            ScalarValue::List(_)
             | ScalarValue::Date64(_)
             | ScalarValue::Time32Second(_)
             | ScalarValue::Time32Millisecond(_)
@@ -1467,10 +1469,12 @@ impl Datum {
             | ScalarValue::Decimal128(_, _, _)
             | ScalarValue::Null
             | ScalarValue::IntervalMonthDayNano(_)
-            | ScalarValue::Fixedsizelist(_, _, _)
+            | ScalarValue::FixedSizeList(_)
             | ScalarValue::DurationSecond(_)
             | ScalarValue::DurationMillisecond(_)
             | ScalarValue::DurationMicrosecond(_)
+            | ScalarValue::Decimal256(_, _, _)
+            | ScalarValue::LargeList(_)
             | ScalarValue::DurationNanosecond(_) => None,
         }
     }
@@ -1502,7 +1506,7 @@ impl<'a> DatumView<'a> {
                 v.map(|v| DatumView::Timestamp(Timestamp::new(v)))
             }
             ScalarValue::Dictionary(_, literal) => 
DatumView::from_scalar_value(literal),
-            ScalarValue::List(_, _)
+            ScalarValue::List(_)
             | ScalarValue::Date64(_)
             | ScalarValue::Time32Second(_)
             | ScalarValue::Time32Millisecond(_)
@@ -1516,10 +1520,12 @@ impl<'a> DatumView<'a> {
             | ScalarValue::Decimal128(_, _, _)
             | ScalarValue::Null
             | ScalarValue::IntervalMonthDayNano(_)
-            | ScalarValue::Fixedsizelist(_, _, _)
+            | ScalarValue::FixedSizeList(_)
             | ScalarValue::DurationSecond(_)
             | ScalarValue::DurationMillisecond(_)
             | ScalarValue::DurationMicrosecond(_)
+            | ScalarValue::Decimal256(_, _, _)
+            | ScalarValue::LargeList(_)
             | ScalarValue::DurationNanosecond(_) => None,
         }
     }
diff --git a/src/common_types/src/projected_schema.rs 
b/src/common_types/src/projected_schema.rs
index 30e9eb01..1eff7dc4 100644
--- a/src/common_types/src/projected_schema.rs
+++ b/src/common_types/src/projected_schema.rs
@@ -105,7 +105,7 @@ pub struct RowProjector {
     /// For example:
     ///   source columns in sst: 0,1,2,3,4
     ///   target projection columns: 2,1,3
-    ///   
+    ///
     ///   the actual columns in fetched record: 1,2,3
     ///   relative columns indexes in fetched record: 0,1,2
     ///
@@ -347,6 +347,10 @@ impl ProjectedSchema {
     pub fn table_schema(&self) -> &Schema {
         &self.0.table_schema
     }
+
+    pub fn target_column_schema(&self, i: usize) -> &ColumnSchema {
+        self.0.target_record_schema.column(i)
+    }
 }
 
 impl From<ProjectedSchema> for horaedbproto::schema::ProjectedSchema {
diff --git a/src/common_types/src/record_batch.rs 
b/src/common_types/src/record_batch.rs
index 2a543ca5..0278aa70 100644
--- a/src/common_types/src/record_batch.rs
+++ b/src/common_types/src/record_batch.rs
@@ -24,7 +24,7 @@ use arrow::{
     compute,
     datatypes::{DataType, Field, Schema, SchemaRef as ArrowSchemaRef, 
TimeUnit},
     error::ArrowError,
-    record_batch::RecordBatch as ArrowRecordBatch,
+    record_batch::{RecordBatch as ArrowRecordBatch, RecordBatchOptions},
 };
 use arrow_ext::operation;
 use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
@@ -124,14 +124,18 @@ pub struct RecordBatchData {
 }
 
 impl RecordBatchData {
-    fn new(arrow_schema: ArrowSchemaRef, column_blocks: Vec<ColumnBlock>) -> 
Result<Self> {
+    fn new(
+        arrow_schema: ArrowSchemaRef,
+        column_blocks: Vec<ColumnBlock>,
+        options: RecordBatchOptions,
+    ) -> Result<Self> {
         let arrays = column_blocks
             .iter()
             .map(|column| column.to_arrow_array_ref())
-            .collect();
-
+            .collect::<Vec<_>>();
         let arrow_record_batch =
-            ArrowRecordBatch::try_new(arrow_schema, 
arrays).context(CreateArrow)?;
+            ArrowRecordBatch::try_new_with_options(arrow_schema, arrays, 
&options)
+                .context(CreateArrow)?;
 
         Ok(RecordBatchData {
             arrow_record_batch,
@@ -140,10 +144,7 @@ impl RecordBatchData {
     }
 
     fn num_rows(&self) -> usize {
-        self.column_blocks
-            .first()
-            .map(|column| column.num_rows())
-            .unwrap_or(0)
+        self.arrow_record_batch.num_rows()
     }
 
     fn take_column_block(&mut self, index: usize) -> ColumnBlock {
@@ -227,9 +228,13 @@ impl RecordBatch {
         }
     }
 
-    pub fn new(schema: RecordSchema, column_blocks: Vec<ColumnBlock>) -> 
Result<Self> {
+    pub fn new(
+        schema: RecordSchema,
+        column_blocks: Vec<ColumnBlock>,
+        num_rows: usize,
+    ) -> Result<Self> {
         ensure!(schema.num_columns() == column_blocks.len(), SchemaLen);
-
+        let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
         // Validate schema and column_blocks.
         for (column_schema, column_block) in 
schema.columns().iter().zip(column_blocks.iter()) {
             ensure!(
@@ -243,7 +248,7 @@ impl RecordBatch {
         }
 
         let arrow_schema = schema.to_arrow_schema_ref();
-        let data = RecordBatchData::new(arrow_schema, column_blocks)?;
+        let data = RecordBatchData::new(arrow_schema, column_blocks, options)?;
 
         Ok(Self { schema, data })
     }
@@ -388,6 +393,7 @@ impl FetchedRecordBatch {
         let mut column_blocks = 
Vec::with_capacity(fetched_schema.num_columns());
         let num_rows = arrow_record_batch.num_rows();
         let num_columns = arrow_record_batch.num_columns();
+        let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
         for (col_idx_opt, col_schema) in 
column_indexes.iter().zip(fetched_schema.columns()) {
             match col_idx_opt {
                 Some(col_idx) => {
@@ -419,7 +425,8 @@ impl FetchedRecordBatch {
             }
         }
 
-        let data = RecordBatchData::new(fetched_schema.to_arrow_schema_ref(), 
column_blocks)?;
+        let data =
+            RecordBatchData::new(fetched_schema.to_arrow_schema_ref(), 
column_blocks, options)?;
 
         Ok(FetchedRecordBatch {
             schema: fetched_schema,
@@ -471,6 +478,8 @@ impl FetchedRecordBatch {
         // Get the schema after projection.
         let record_schema = projected_schema.to_record_schema();
         let mut column_blocks = 
Vec::with_capacity(record_schema.num_columns());
+        let num_rows = self.data.num_rows();
+        let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
 
         for column_schema in record_schema.columns() {
             let column_index =
@@ -485,8 +494,8 @@ impl FetchedRecordBatch {
             column_blocks.push(column_block);
         }
 
-        let data = RecordBatchData::new(record_schema.to_arrow_schema_ref(), 
column_blocks)?;
-
+        let data =
+            RecordBatchData::new(record_schema.to_arrow_schema_ref(), 
column_blocks, options)?;
         Ok(RecordBatch {
             schema: record_schema,
             data,
@@ -582,6 +591,7 @@ pub struct FetchedRecordBatchBuilder {
     fetched_schema: RecordSchema,
     primary_key_indexes: Option<Vec<usize>>,
     builders: Vec<ColumnBlockBuilder>,
+    num_rows: usize,
 }
 
 impl FetchedRecordBatchBuilder {
@@ -601,6 +611,7 @@ impl FetchedRecordBatchBuilder {
             fetched_schema,
             primary_key_indexes,
             builders,
+            num_rows: 0,
         }
     }
 
@@ -624,6 +635,7 @@ impl FetchedRecordBatchBuilder {
             fetched_schema: record_schema,
             primary_key_indexes,
             builders,
+            num_rows: 0,
         }
     }
 
@@ -671,6 +683,13 @@ impl FetchedRecordBatchBuilder {
         Ok(())
     }
 
+    /// When the record batch contains no column, its row num may not be 0, so
+    /// we need to inc row num explicitly in this case.
+    /// See: https://github.com/apache/arrow-datafusion/pull/7920
+    pub fn inc_row_num(&mut self, n: usize) {
+        self.num_rows += n;
+    }
+
     /// Append `len` from `start` (inclusive) to this builder.
     ///
     /// REQUIRE:
@@ -702,7 +721,7 @@ impl FetchedRecordBatchBuilder {
         self.builders
             .first()
             .map(|builder| builder.len())
-            .unwrap_or(0)
+            .unwrap_or(self.num_rows)
     }
 
     /// Returns true if the builder is empty.
@@ -725,11 +744,16 @@ impl FetchedRecordBatchBuilder {
             .map(|builder| builder.build())
             .collect();
         let arrow_schema = self.fetched_schema.to_arrow_schema_ref();
+        let num_rows = column_blocks
+            .first()
+            .map(|block| block.num_rows())
+            .unwrap_or(self.num_rows);
+        let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
 
         Ok(FetchedRecordBatch {
             schema: self.fetched_schema.clone(),
             primary_key_indexes: self.primary_key_indexes.clone(),
-            data: RecordBatchData::new(arrow_schema, column_blocks)?,
+            data: RecordBatchData::new(arrow_schema, column_blocks, options)?,
         })
     }
 }
diff --git a/src/components/parquet_ext/src/meta_data.rs 
b/src/components/parquet_ext/src/meta_data.rs
index 00a0bb3a..ad18a36c 100644
--- a/src/components/parquet_ext/src/meta_data.rs
+++ b/src/components/parquet_ext/src/meta_data.rs
@@ -19,9 +19,10 @@ use std::{ops::Range, sync::Arc};
 
 use async_trait::async_trait;
 use bytes::Bytes;
+use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
 use generic_error::GenericResult;
 use parquet::{
-    arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder},
+    arrow::arrow_reader::ArrowReaderOptions,
     errors::{ParquetError, Result},
     file::{footer, metadata::ParquetMetaData},
 };
diff --git a/src/components/parquet_ext/src/prune/min_max.rs 
b/src/components/parquet_ext/src/prune/min_max.rs
index 8ea39299..0a717021 100644
--- a/src/components/parquet_ext/src/prune/min_max.rs
+++ b/src/components/parquet_ext/src/prune/min_max.rs
@@ -230,7 +230,7 @@ mod test {
     }
 
     fn prepare_parquet_schema_descr(schema: &ArrowSchema) -> SchemaDescPtr {
-        let mut fields = schema
+        let fields = schema
             .fields()
             .iter()
             .map(|field| {
@@ -245,7 +245,7 @@ mod test {
             })
             .collect();
         let schema = SchemaType::group_type_builder("schema")
-            .with_fields(&mut fields)
+            .with_fields(fields)
             .build()
             .unwrap();
 
diff --git a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs 
b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index feba491f..dd430f52 100644
--- a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -129,8 +129,10 @@ impl ExecutionPlan for UnresolvedPartitionedScan {
         ))
     }
 
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
+    fn statistics(
+        &self,
+    ) -> Result<datafusion::common::Statistics, 
datafusion::error::DataFusionError> {
+        Ok(Statistics::new_unknown(&self.schema()))
     }
 }
 
@@ -367,8 +369,10 @@ impl ExecutionPlan for ResolvedPartitionedScan {
         Ok(Box::pin(record_stream))
     }
 
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
+    fn statistics(
+        &self,
+    ) -> Result<datafusion::common::Statistics, 
datafusion::error::DataFusionError> {
+        Ok(Statistics::new_unknown(&self.schema()))
     }
 
     fn metrics(&self) -> Option<MetricsSet> {
@@ -578,8 +582,10 @@ impl ExecutionPlan for UnresolvedSubTableScan {
         ))
     }
 
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
+    fn statistics(
+        &self,
+    ) -> Result<datafusion::common::Statistics, 
datafusion::error::DataFusionError> {
+        Ok(Statistics::new_unknown(&self.schema()))
     }
 }
 
diff --git a/src/df_engine_extensions/src/dist_sql_query/test_util.rs 
b/src/df_engine_extensions/src/dist_sql_query/test_util.rs
index 1f4e788f..c42f9e38 100644
--- a/src/df_engine_extensions/src/dist_sql_query/test_util.rs
+++ b/src/df_engine_extensions/src/dist_sql_query/test_util.rs
@@ -490,8 +490,10 @@ impl ExecutionPlan for MockScan {
         unimplemented!()
     }
 
-    fn statistics(&self) -> datafusion::physical_plan::Statistics {
-        unimplemented!()
+    fn statistics(&self) -> DfResult<datafusion::physical_plan::Statistics> {
+        Ok(datafusion::physical_plan::Statistics::new_unknown(
+            &self.schema(),
+        ))
     }
 }
 
diff --git a/src/df_operator/src/scalar.rs b/src/df_operator/src/scalar.rs
index 1535ebdb..58e8214c 100644
--- a/src/df_operator/src/scalar.rs
+++ b/src/df_operator/src/scalar.rs
@@ -31,6 +31,7 @@ pub struct ScalarUdf {
 }
 
 impl ScalarUdf {
+    #[allow(deprecated)]
     pub fn create(name: &str, func: ScalarFunction) -> Self {
         let signature = func.signature().to_datafusion_signature();
         let return_type = func.return_type().to_datafusion_return_type();
@@ -43,7 +44,7 @@ impl ScalarUdf {
 
     #[inline]
     pub fn name(&self) -> &str {
-        &self.df_udf.name
+        self.df_udf.name()
     }
 
     /// Convert into datafusion's udf
diff --git a/src/df_operator/src/udaf.rs b/src/df_operator/src/udaf.rs
index 448a26c6..44f39136 100644
--- a/src/df_operator/src/udaf.rs
+++ b/src/df_operator/src/udaf.rs
@@ -31,6 +31,7 @@ pub struct AggregateUdf {
 }
 
 impl AggregateUdf {
+    #[allow(deprecated)]
     pub fn create(name: &str, func: AggregateFunction) -> Self {
         let signature = func.signature().to_datafusion_signature();
         let return_type = func.return_type().to_datafusion_return_type();
@@ -50,7 +51,7 @@ impl AggregateUdf {
 
     #[inline]
     pub fn name(&self) -> &str {
-        &self.df_udaf.name
+        self.df_udaf.name()
     }
 
     #[inline]
diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs
index cac5af0c..cc455b3f 100644
--- a/src/interpreters/src/insert.rs
+++ b/src/interpreters/src/insert.rs
@@ -374,5 +374,5 @@ fn get_or_extract_column_from_row_groups(
             Ok(columnar_value)
         })?;
 
-    Ok(column.into_array(num_rows))
+    column.into_array(num_rows).context(DatafusionExecutor)
 }
diff --git a/src/interpreters/src/tests.rs b/src/interpreters/src/tests.rs
index 6d521738..f9c8c75b 100644
--- a/src/interpreters/src/tests.rs
+++ b/src/interpreters/src/tests.rs
@@ -117,7 +117,7 @@ where
             .enable_partition_table_access(enable_partition_table_access)
             .build();
         let sql= format!("CREATE TABLE IF NOT EXISTS {table_name}(c1 string 
tag not null,ts timestamp not null, c3 string, timestamp key(ts),primary 
key(c1, ts)) \
-        ENGINE=Analytic WITH 
(ttl='70d',update_mode='overwrite',arena_block_size='1KB')");
+        ENGINE=Analytic WITH 
(enable_ttl='false',update_mode='overwrite',arena_block_size='1KB')");
 
         let output = self.sql_to_output_with_context(&sql, ctx).await?;
         assert!(
@@ -157,7 +157,7 @@ where
             .enable_partition_table_access(enable_partition_table_access)
             .build();
         let sql = format!("select * from {table_name}");
-        let output = self.sql_to_output_with_context(&sql, ctx).await?;
+        let output = self.sql_to_output_with_context(&sql, ctx.clone()).await?;
         let records = output.try_into().unwrap();
         let expected = vec![
             
"+------------+---------------------+--------+--------+------------+--------------+",
@@ -169,15 +169,15 @@ where
         ];
         test_util::assert_record_batches_eq(&expected, records);
 
-        let sql = "select count(*) from test_table";
-        let output = self.sql_to_output(sql).await?;
+        let sql = format!("select count(*) from {table_name}");
+        let output = self.sql_to_output_with_context(&sql, ctx).await?;
         let records = output.try_into().unwrap();
         let expected = vec![
-            "+-----------------+",
-            "| COUNT(UInt8(1)) |",
-            "+-----------------+",
-            "| 2               |",
-            "+-----------------+",
+            "+----------+",
+            "| COUNT(*) |",
+            "+----------+",
+            "| 2        |",
+            "+----------+",
         ];
         test_util::assert_record_batches_eq(&expected, records);
 
diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs
index 1c999ad0..673b6131 100644
--- a/src/proxy/src/grpc/prom_query.rs
+++ b/src/proxy/src/grpc/prom_query.rs
@@ -471,7 +471,7 @@ mod tests {
         let schema = build_schema();
         let record_schema = schema.to_record_schema();
         let column_blocks = build_column_block();
-        let record_batch = RecordBatch::new(record_schema, 
column_blocks).unwrap();
+        let record_batch = RecordBatch::new(record_schema, column_blocks, 
4).unwrap();
 
         let column_name = ColumnNames {
             timestamp: "timestamp".to_string(),
diff --git a/src/proxy/src/influxdb/types.rs b/src/proxy/src/influxdb/types.rs
index 117b5cf3..488f5ded 100644
--- a/src/proxy/src/influxdb/types.rs
+++ b/src/proxy/src/influxdb/types.rs
@@ -744,7 +744,7 @@ mod tests {
     fn test_influxql_result() {
         let record_schema = build_test_record_schema();
         let column_blocks = build_test_column_blocks();
-        let record_batch = RecordBatch::new(record_schema, 
column_blocks).unwrap();
+        let record_batch = RecordBatch::new(record_schema, column_blocks, 
7).unwrap();
 
         let mut builder = InfluxqlResultBuilder::new(record_batch.schema(), 
0).unwrap();
         builder.add_record_batch(record_batch).unwrap();
diff --git a/src/query_engine/src/datafusion_impl/mod.rs 
b/src/query_engine/src/datafusion_impl/mod.rs
index 48e42c21..482628f8 100644
--- a/src/query_engine/src/datafusion_impl/mod.rs
+++ b/src/query_engine/src/datafusion_impl/mod.rs
@@ -137,7 +137,7 @@ impl DfContextBuilder {
 
         // Using default logcial optimizer, if want to add more custom rule, 
using
         // `add_optimizer_rule` to add.
-        let state = SessionState::with_config_rt(df_session_config, 
self.runtime_env.clone());
-        SessionContext::with_state(state)
+        let state = SessionState::new_with_config_rt(df_session_config, 
self.runtime_env.clone());
+        SessionContext::new_with_state(state)
     }
 }
diff --git 
a/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs 
b/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs
index c963c75f..d1406a75 100644
--- a/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs
+++ b/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs
@@ -21,7 +21,9 @@ use std::sync::Arc;
 
 use datafusion::{
     config::ConfigOptions,
-    physical_optimizer::{optimizer::PhysicalOptimizerRule, 
repartition::Repartition},
+    physical_optimizer::{
+        enforce_distribution::EnforceDistribution, 
optimizer::PhysicalOptimizerRule,
+    },
     physical_plan::ExecutionPlan,
 };
 use logger::debug;
@@ -34,7 +36,7 @@ pub struct RepartitionAdapter {
 
 impl Adapter for RepartitionAdapter {
     fn may_adapt(original_rule: OptimizeRuleRef) -> OptimizeRuleRef {
-        if original_rule.name() == Repartition::new().name() {
+        if original_rule.name() == EnforceDistribution::new().name() {
             Arc::new(Self { original_rule })
         } else {
             original_rule
diff --git 
a/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs 
b/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs
index a5a6161c..3b1a0cd9 100644
--- a/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs
+++ b/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs
@@ -37,7 +37,7 @@ use common_types::{
     time::{TimeRange, Timestamp},
 };
 use datafusion::{
-    error::{DataFusionError, Result as ArrowResult},
+    error::{DataFusionError, Result as DataFusionResult},
     execution::context::TaskContext,
     physical_expr::PhysicalSortExpr,
     physical_plan::{
@@ -93,15 +93,15 @@ impl PhysicalExpr for ExtractTsidExpr {
         self
     }
 
-    fn data_type(&self, _input_schema: &ArrowSchema) -> ArrowResult<DataType> {
+    fn data_type(&self, _input_schema: &ArrowSchema) -> 
DataFusionResult<DataType> {
         Ok(DataType::UInt64)
     }
 
-    fn nullable(&self, _input_schema: &ArrowSchema) -> ArrowResult<bool> {
+    fn nullable(&self, _input_schema: &ArrowSchema) -> DataFusionResult<bool> {
         Ok(false)
     }
 
-    fn evaluate(&self, batch: &RecordBatch) -> ArrowResult<ColumnarValue> {
+    fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> 
{
         let tsid_idx = batch
             .schema()
             .index_of(TSID_COLUMN)
@@ -116,7 +116,7 @@ impl PhysicalExpr for ExtractTsidExpr {
     fn with_new_children(
         self: Arc<Self>,
         _children: Vec<Arc<dyn PhysicalExpr>>,
-    ) -> ArrowResult<Arc<dyn PhysicalExpr>> {
+    ) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
         Ok(self)
     }
 
@@ -204,7 +204,7 @@ impl ExecutionPlan for PromAlignExec {
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> ArrowResult<Arc<dyn ExecutionPlan>> {
+    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
         match children.len() {
             1 => Ok(Arc::new(PromAlignExec {
                 input: children[0].clone(),
@@ -222,7 +222,7 @@ impl ExecutionPlan for PromAlignExec {
         &self,
         partition: usize,
         context: Arc<TaskContext>,
-    ) -> ArrowResult<DfSendableRecordBatchStream> {
+    ) -> DataFusionResult<DfSendableRecordBatchStream> {
         debug!("PromAlignExec: partition:{}", partition);
         Ok(Box::pin(PromAlignReader {
             input: self.input.execute(partition, context)?,
@@ -236,9 +236,9 @@ impl ExecutionPlan for PromAlignExec {
         }))
     }
 
-    fn statistics(&self) -> Statistics {
+    fn statistics(&self) -> DataFusionResult<Statistics> {
         // TODO(chenxiang)
-        Statistics::default()
+        Ok(Statistics::new_unknown(&self.schema()))
     }
 }
 
diff --git a/src/query_engine/src/datafusion_impl/task_context.rs 
b/src/query_engine/src/datafusion_impl/task_context.rs
index aee98128..d1ea667d 100644
--- a/src/query_engine/src/datafusion_impl/task_context.rs
+++ b/src/query_engine/src/datafusion_impl/task_context.rs
@@ -40,7 +40,6 @@ use df_engine_extensions::dist_sql_query::{
 };
 use futures::future::BoxFuture;
 use generic_error::BoxError;
-use prost::Message;
 use runtime::Priority;
 use snafu::ResultExt;
 use table_engine::{
@@ -116,7 +115,7 @@ impl Preprocessor {
         ctx: &Context,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Decode to datafusion physical plan.
-        let protobuf = protobuf::PhysicalPlanNode::decode(encoded_plan)
+        let protobuf = protobuf::PhysicalPlanNode::try_decode(encoded_plan)
             .box_err()
             .with_context(|| ExecutorWithCause {
                 msg: Some("failed to decode plan".to_string()),
diff --git a/src/query_frontend/src/influxql/planner.rs 
b/src/query_frontend/src/influxql/planner.rs
index 3b21228a..ed8d9c14 100644
--- a/src/query_frontend/src/influxql/planner.rs
+++ b/src/query_frontend/src/influxql/planner.rs
@@ -57,7 +57,7 @@ struct InfluxQLSchemaProvider<'a, P: MetaProvider> {
 impl<'a, P: MetaProvider> SchemaProvider for InfluxQLSchemaProvider<'a, P> {
     fn get_table_provider(&self, name: &str) -> 
datafusion::error::Result<Arc<dyn TableSource>> {
         self.context_provider
-            .get_table_provider(name.into())
+            .get_table_source(name.into())
             .map_err(|e| {
                 DataFusionError::Plan(format!(
                     "measurement does not exist, measurement:{name}, 
source:{e}"
diff --git a/src/query_frontend/src/logical_optimizer/mod.rs 
b/src/query_frontend/src/logical_optimizer/mod.rs
index 4d62e877..8f2bf42a 100644
--- a/src/query_frontend/src/logical_optimizer/mod.rs
+++ b/src/query_frontend/src/logical_optimizer/mod.rs
@@ -30,7 +30,8 @@ use datafusion::{
 use type_conversion::TypeConversion;
 
 pub fn optimize_plan(plan: &LogicalPlan) -> Result<LogicalPlan> {
-    let state = SessionState::with_config_rt(SessionConfig::new(), 
Arc::new(RuntimeEnv::default()));
+    let state =
+        SessionState::new_with_config_rt(SessionConfig::new(), 
Arc::new(RuntimeEnv::default()));
     let state = register_analyzer_rules(state);
     // Register iox optimizers, used by influxql.
     let state = 
influxql_query::logical_optimizer::register_iox_logical_optimizers(state);
diff --git a/src/query_frontend/src/logical_optimizer/type_conversion.rs 
b/src/query_frontend/src/logical_optimizer/type_conversion.rs
index 89f0a14e..0aeaaba2 100644
--- a/src/query_frontend/src/logical_optimizer/type_conversion.rs
+++ b/src/query_frontend/src/logical_optimizer/type_conversion.rs
@@ -30,7 +30,7 @@ use datafusion::{
     logical_expr::{
         expr::{Expr, InList},
         logical_plan::{Filter, LogicalPlan, TableScan},
-        utils, Between, BinaryExpr, ExprSchemable, Operator,
+        Between, BinaryExpr, ExprSchemable, Operator,
     },
     optimizer::analyzer::AnalyzerRule,
     scalar::ScalarValue,
@@ -113,17 +113,18 @@ impl AnalyzerRule for TypeConversion {
                     .map(|plan| self.analyze(plan.clone(), config))
                     .collect::<Result<Vec<_>>>()?;
 
-                let expr = plan
+                let exprs = plan
                     .expressions()
                     .into_iter()
                     .map(|e| e.rewrite(&mut rewriter))
                     .collect::<Result<Vec<_>>>()?;
 
-                Ok(utils::from_plan(&plan, &expr, &new_inputs)?)
+                Ok(LogicalPlan::with_new_exprs(&plan, exprs, &new_inputs)?)
             }
             LogicalPlan::Subquery(_)
             | LogicalPlan::Statement { .. }
             | LogicalPlan::SubqueryAlias(_)
+            | LogicalPlan::Copy(_)
             | LogicalPlan::Unnest(_)
             | LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
         }
@@ -209,7 +210,7 @@ impl<'a> TypeRewriter<'a> {
             }
         }
 
-        let array = value.to_array();
+        let array = value.to_array()?;
         ScalarValue::try_from_array(
             &compute::cast(&array, 
data_type).map_err(DataFusionError::ArrowError)?,
             // index: Converts a value in `array` at `index` into a ScalarValue
diff --git a/src/query_frontend/src/parser.rs b/src/query_frontend/src/parser.rs
index e01c4d03..23efa0ad 100644
--- a/src/query_frontend/src/parser.rs
+++ b/src/query_frontend/src/parser.rs
@@ -352,7 +352,7 @@ impl<'a> Parser<'a> {
                     is_dictionary = true;
                 }
             }
-            if c.data_type != DataType::String && is_dictionary {
+            if !matches!(c.data_type, DataType::String(_)) && is_dictionary {
                 return parser_err!(format!(
                     "Only string column can be dictionary encoded: {:?}",
                     c.to_string()
@@ -1001,7 +1001,7 @@ mod tests {
         let columns = vec![
             make_column_def("c1", DataType::Timestamp(None, 
TimezoneInfo::None)),
             make_column_def("c2", DataType::Double),
-            make_column_def("c3", DataType::String),
+            make_column_def("c3", DataType::String(None)),
         ];
 
         let sql = "CREATE TABLE mytbl(c1 timestamp, c2 double, c3 string,) 
ENGINE = XX";
@@ -1027,7 +1027,7 @@ mod tests {
         let columns = vec![
             make_column_def("c1", DataType::Timestamp(None, 
TimezoneInfo::None)),
             make_comment_column_def("c2", DataType::Double, "id".to_string()),
-            make_comment_column_def("c3", DataType::String, 
"name".to_string()),
+            make_comment_column_def("c3", DataType::String(None), 
"name".to_string()),
         ];
 
         let sql = "CREATE TABLE mytbl(c1 timestamp, c2 double comment 'id', c3 
string comment 'name',) ENGINE = XX";
@@ -1053,7 +1053,7 @@ mod tests {
         let columns = vec![
             make_column_def("c1", DataType::Timestamp(None, 
TimezoneInfo::None)),
             make_column_def("c2", DataType::Timestamp(None, 
TimezoneInfo::None)),
-            make_column_def("c3", DataType::String),
+            make_column_def("c3", DataType::String(None)),
             make_column_def("c4", DataType::Double),
         ];
 
@@ -1253,7 +1253,7 @@ mod tests {
                 table_name: make_table_name("t"),
                 columns: vec![
                     make_column_def("c1", DataType::Double),
-                    make_column_def("c2", DataType::String),
+                    make_column_def("c2", DataType::String(None)),
                 ],
             });
             expect_parse_ok(sql, expected).unwrap();
@@ -1277,7 +1277,7 @@ mod tests {
                 table_name: make_table_name("t"),
                 columns: vec![
                     make_column_def("c1", DataType::Double),
-                    make_tag_column_def("c2", DataType::String),
+                    make_tag_column_def("c2", DataType::String(None)),
                 ],
             });
             expect_parse_ok(sql, expected).unwrap();
@@ -1287,7 +1287,7 @@ mod tests {
             let sql = "ALTER TABLE t ADD COLUMN c1 string tag";
             let expected = Statement::AlterAddColumn(AlterAddColumn {
                 table_name: make_table_name("t"),
-                columns: vec![make_tag_column_def("c1", DataType::String)],
+                columns: vec![make_tag_column_def("c1", 
DataType::String(None))],
             });
             expect_parse_ok(sql, expected).unwrap();
         }
diff --git a/src/query_frontend/src/promql/convert.rs 
b/src/query_frontend/src/promql/convert.rs
index 297e7161..f364a0b1 100644
--- a/src/query_frontend/src/promql/convert.rs
+++ b/src/query_frontend/src/promql/convert.rs
@@ -24,7 +24,7 @@ use common_types::{
 use datafusion::{
     logical_expr::{
         avg, count,
-        expr::{Alias, ScalarUDF},
+        expr::{Alias, ScalarFunction},
         lit,
         logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
         max, min, sum, Expr as DataFusionExpr,
@@ -316,11 +316,10 @@ impl Expr {
                         // TSID is lost after aggregate, but PromAlignNode 
need a unique id, so
                         // mock UUID as tsid based on groupby keys
                         DataFusionExpr::Alias(Alias {
-                            expr: Box::new(DataFusionExpr::ScalarUDF(ScalarUDF 
{
-                                fun: 
Arc::new(create_unique_id(tag_exprs.len())),
-                                args: tag_exprs.clone(),
-                            })),
+                            expr: Box::new(DataFusionExpr::ScalarFunction(
+                                
ScalarFunction::new_udf(Arc::new(create_unique_id(tag_exprs.len())), 
tag_exprs.clone()))),
                             name: TSID_COLUMN.to_string(),
+                            relation: None,
                         });
                     let mut projection = tag_exprs.clone();
                     projection.extend(vec![
@@ -371,6 +370,7 @@ impl Expr {
         Ok(DataFusionExpr::Alias(Alias {
             expr: Box::new(expr),
             name: alias,
+            relation: None,
         }))
     }
 }
@@ -578,7 +578,7 @@ impl Selector {
             .context(TableNotFound { name: &table })?;
 
         let table_provider = meta_provider
-            .get_table_provider(table_ref.table.name().into())
+            .get_table_source(table_ref.table.name().into())
             .context(TableProviderNotFound { name: &table })?;
         let schema = 
Schema::try_from(table_provider.schema()).context(BuildTableSchema)?;
         let timestamp_column_name = schema.timestamp_name().to_string();
diff --git a/src/query_frontend/src/promql/remote.rs 
b/src/query_frontend/src/promql/remote.rs
index c687b51d..c3c1439e 100644
--- a/src/query_frontend/src/promql/remote.rs
+++ b/src/query_frontend/src/promql/remote.rs
@@ -64,7 +64,7 @@ pub fn remote_query_to_plan<P: MetaProvider>(
     let (metric, field, mut filters) = normalize_matchers(query.matchers)?;
 
     let table_provider = meta_provider
-        .get_table_provider(TableReference::bare(&metric))
+        .get_table_source(TableReference::bare(&metric))
         .context(TableProviderNotFound { name: &metric })?;
     let schema = 
Schema::try_from(table_provider.schema()).context(BuildTableSchema)?;
     let timestamp_col_name = schema.timestamp_name();
diff --git a/src/query_frontend/src/provider.rs 
b/src/query_frontend/src/provider.rs
index 4380829f..64647254 100644
--- a/src/query_frontend/src/provider.rs
+++ b/src/query_frontend/src/provider.rs
@@ -320,7 +320,7 @@ impl<'a, P: MetaProvider> MetaProvider for 
ContextProviderAdapter<'a, P> {
 }
 
 impl<'a, P: MetaProvider> ContextProvider for ContextProviderAdapter<'a, P> {
-    fn get_table_provider(
+    fn get_table_source(
         &self,
         name: TableReference,
     ) -> std::result::Result<Arc<(dyn TableSource + 'static)>, 
DataFusionError> {
diff --git a/src/table_engine/src/memory.rs b/src/table_engine/src/memory.rs
index 68967705..20cfe583 100644
--- a/src/table_engine/src/memory.rs
+++ b/src/table_engine/src/memory.rs
@@ -260,7 +260,7 @@ fn row_group_to_record_batch(
         column_blocks.push(column_block);
     }
 
-    RecordBatch::new(record_schema.clone(), column_blocks)
+    RecordBatch::new(record_schema.clone(), column_blocks, rows.num_rows())
         .box_err()
         .context(ErrWithSource {
             msg: "failed to create RecordBatch",
diff --git a/src/table_engine/src/predicate.rs 
b/src/table_engine/src/predicate.rs
index 723724f3..b316b99e 100644
--- a/src/table_engine/src/predicate.rs
+++ b/src/table_engine/src/predicate.rs
@@ -329,6 +329,8 @@ impl<'a> TimeRangeExtractor<'a> {
             | Operator::BitwiseAnd
             | Operator::BitwiseOr
             | Operator::BitwiseXor
+            | Operator::AtArrow
+            | Operator::ArrowAt
             | Operator::BitwiseShiftRight
             | Operator::BitwiseShiftLeft
             | Operator::StringConcat => TimeRange::min_to_max(),
@@ -427,20 +429,18 @@ impl<'a> TimeRangeExtractor<'a> {
             | Expr::IsUnknown(_)
             | Expr::IsNotUnknown(_)
             | Expr::Negative(_)
+            | Expr::AggregateUDF(_)
             | Expr::Case { .. }
             | Expr::Cast { .. }
             | Expr::TryCast { .. }
             | Expr::Sort { .. }
             | Expr::ScalarFunction { .. }
-            | Expr::ScalarUDF { .. }
             | Expr::AggregateFunction { .. }
             | Expr::WindowFunction { .. }
-            | Expr::AggregateUDF { .. }
             | Expr::Wildcard { .. }
             | Expr::Exists { .. }
             | Expr::InSubquery { .. }
             | Expr::ScalarSubquery(_)
-            | Expr::QualifiedWildcard { .. }
             | Expr::GroupingSet(_)
             | Expr::GetIndexedField { .. }
             | Expr::OuterReferenceColumn { .. }
diff --git a/src/table_engine/src/provider.rs b/src/table_engine/src/provider.rs
index d5e4c69f..bcca5ba8 100644
--- a/src/table_engine/src/provider.rs
+++ b/src/table_engine/src/provider.rs
@@ -19,6 +19,7 @@
 
 use std::{
     any::Any,
+    collections::HashSet,
     fmt,
     sync::{Arc, Mutex},
     time::{Duration, Instant},
@@ -35,8 +36,10 @@ use datafusion::{
     logical_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType},
     physical_expr::PhysicalSortExpr,
     physical_plan::{
+        expressions,
         metrics::{Count, MetricValue, MetricsSet},
-        DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning,
+        projection::ProjectionExec,
+        DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning, 
PhysicalExpr,
         SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
     },
 };
@@ -230,9 +233,34 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
             priority,
         );
 
+        let mut need_reprojection = false;
+        let all_projections = if let Some(proj) = projection {
+            let mut original_projections = proj.clone();
+            let projections_from_filter =
+                collect_projection_from_expr(filters, 
&self.current_table_schema);
+            for proj in projections_from_filter {
+                if !original_projections.contains(&proj) {
+                    original_projections.push(proj);
+                    // If the projection from filters have columns not in the 
original projection,
+                    // we need to add it to projection, and add a 
ProjectionExec plan to project the
+                    // orignal columns. Eg:
+                    // ```text
+                    // select a from table where b > 1
+                    // ```
+                    // The original projection only contains a, but the filter 
has column b, so we
+                    // need to query both a and b column from table but only
+                    // output a column. More details can be found in:
+                    // 
https://github.com/apache/arrow-datafusion/pull/9131#pullrequestreview-1865020767
+                    need_reprojection = true;
+                }
+            }
+            Some(original_projections)
+        } else {
+            None
+        };
         let predicate = self.check_and_build_predicate_from_filters(filters);
         let projected_schema =
-            ProjectedSchema::new(self.current_table_schema.clone(), 
projection.cloned()).map_err(
+            ProjectedSchema::new(self.current_table_schema.clone(), 
all_projections).map_err(
                 |e| {
                     DataFusionError::Internal(format!(
                         "Invalid projection, plan:{self:?}, 
projection:{projection:?}, err:{e:?}"
@@ -240,6 +268,22 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
                 },
             )?;
 
+        let projection_exprs = if need_reprojection {
+            let original_projection = projection.unwrap();
+            let exprs = (0..original_projection.len())
+                .map(|i| {
+                    let column = projected_schema.target_column_schema(i);
+                    (
+                        Arc::new(expressions::Column::new(&column.name, i))
+                            as Arc<dyn PhysicalExpr>,
+                        column.name.clone(),
+                    )
+                })
+                .collect::<Vec<_>>();
+            Some(exprs)
+        } else {
+            None
+        };
         let opts = ReadOptions {
             deadline,
             read_parallelism,
@@ -256,7 +300,13 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
             priority,
         };
 
-        self.builder.build(request).await
+        let scan = self.builder.build(request).await?;
+        if let Some(expr) = projection_exprs {
+            let plan = ProjectionExec::try_new(expr, scan)?;
+            Ok(Arc::new(plan))
+        } else {
+            Ok(scan)
+        }
     }
 
     fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) -> 
PredicateRef {
@@ -410,7 +460,7 @@ impl ExecutionPlan for ScanTable {
         // However, we have no inputs here, so `UnknownPartitioning` is 
suitable.
         // In datafusion, always set it to `UnknownPartitioning` in the scan 
plan, for
         // example:  
https://github.com/apache/arrow-datafusion/blob/cf152af6515f0808d840e1fe9c63b02802595826/datafusion/core/src/datasource/physical_plan/csv.rs#L175
-        Partitioning::UnknownPartitioning(self.parallelism)
+        Partitioning::UnknownPartitioning(self.parallelism.max(1))
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
@@ -467,9 +517,12 @@ impl ExecutionPlan for ScanTable {
         Some(metric_set)
     }
 
-    fn statistics(&self) -> Statistics {
+    fn statistics(
+        &self,
+    ) -> std::result::Result<datafusion::common::Statistics, 
datafusion::error::DataFusionError>
+    {
         // TODO(yingwen): Implement this
-        Statistics::default()
+        Ok(Statistics::new_unknown(&self.schema()))
     }
 }
 
@@ -477,10 +530,11 @@ impl DisplayAs for ScanTable {
     fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
         write!(
             f,
-            "ScanTable: table={}, parallelism={}, priority={:?}",
+            "ScanTable: table={}, parallelism={}, priority={:?}, 
partition_count={:?}",
             self.table.name(),
             self.request.opts.read_parallelism,
-            self.request.priority
+            self.request.priority,
+            self.output_partitioning()
         )
     }
 }
@@ -495,3 +549,16 @@ impl fmt::Debug for ScanTable {
             .finish()
     }
 }
+
+fn collect_projection_from_expr(exprs: &[Expr], schema: &Schema) -> 
HashSet<usize> {
+    let mut projections = HashSet::new();
+    exprs.iter().for_each(|expr| {
+        for col_name in visitor::find_columns_by_expr(expr) {
+            if let Some(idx) = schema.index_of(&col_name) {
+                projections.insert(idx);
+            }
+        }
+    });
+
+    projections
+}
diff --git a/src/table_engine/src/table.rs b/src/table_engine/src/table.rs
index 7365ca66..3c611b43 100644
--- a/src/table_engine/src/table.rs
+++ b/src/table_engine/src/table.rs
@@ -421,6 +421,7 @@ impl fmt::Debug for ReadRequest {
             .field("projected", &projected)
             .field("predicate", &predicate)
             .field("priority", &self.priority)
+            .field("projected_schema", &self.projected_schema)
             .finish()
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to