haohuaijin opened a new issue, #18602:
URL: https://github.com/apache/datafusion/issues/18602
### Describe the bug
`
decode proto: DecodeError { description: "recursion limit reached", stack:
[("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"),
("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"),
("PhysicalBinaryExprNode", "l"), ("PhysicalEx
prNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode",
"expr_type"), ("FilterExecNode", "expr"), ("PhysicalPlanNode",
"physical_plan_type"), ("CoalesceBatchesExecNode", "input"),
("PhysicalPlanNode", "physical_plan_type"), ("Re
partitionExecNode", "input"), ("PhysicalPlanNode", "physical_plan_type"),
("AggregateExecNode", "input"), ("PhysicalPlanNode", "physical_plan_type"),
("RepartitionExecNode", "input"), ("PhysicalPlanNode", "physical_plan_type"),
("CoalesceBatchesExecNode", "input"), ("PhysicalPlanNode",
"physical_plan_type"), ("AggregateExecNode", "input"), ("PhysicalPlanNode",
"physical_plan_type"), ("ProjectionExecNode", "input"), ("PhysicalPlanNode",
"physical_plan_type"), ("SortExecNode", "input"), ("PhysicalPlanNode",
"physical_plan_type"), ("SortPreservingMergeExecNode", "input"),
("PhysicalPlanNode", "physical_plan_type")] }
`
### To Reproduce
```rust
use arrow::array::StringArray;
use bytes::BytesMut;
use datafusion::arrow::array::UInt64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_proto::physical_plan::{AsExecutionPlan,
DefaultPhysicalExtensionCodec};
use datafusion_proto::protobuf;
use prost::Message;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
let mem_table = create_memtable()?;
let ctx = SessionContext::new();
ctx.register_table("netflow", Arc::new(mem_table))?;
let sql = "SELECT
dst_geo_country_name AS x_axis_1,
dst_geo_city_name AS x_axis_2,
sum(packets) AS y_axis_1
FROM
netflow
WHERE
dst_geo_country_name IS NOT NULL
AND src_addr NOT LIKE '10.201.%'
AND dst_addr NOT LIKE '10.201.%'
AND src_addr NOT LIKE '10.202.%'
AND dst_addr NOT LIKE '10.202.%'
AND src_addr NOT LIKE '10.203.%'
AND dst_addr NOT LIKE '10.203.%'
AND src_addr NOT LIKE '10.204.%'
AND dst_addr NOT LIKE '10.204.%'
AND src_addr NOT LIKE '172.16.186.%'
AND dst_addr NOT LIKE '172.16.186.%'
AND src_addr NOT LIKE '172.16.187.%'
AND dst_addr NOT LIKE '172.16.187.%'
AND src_addr NOT LIKE '172.16.188.%'
AND dst_addr NOT LIKE '172.16.188.%'
AND src_addr NOT LIKE '10.102.45.%'
AND dst_addr NOT LIKE '10.102.45.%'
AND src_addr NOT LIKE '172.25.210.%'
AND dst_addr NOT LIKE '172.25.210.%'
AND src_addr NOT LIKE '172.25.211.%'
AND dst_addr NOT LIKE '172.25.211.%'
AND src_addr NOT LIKE '141.226.101.%'
AND dst_addr NOT LIKE '141.226.101.%'
AND src_addr NOT LIKE '167.86.40.%'
AND dst_addr NOT LIKE '167.86.40.%'
AND src_addr NOT LIKE '66.22.38.%'
AND dst_addr NOT LIKE '66.22.38.%'
AND src_addr != '168.143.191.55'
AND dst_addr != '168.143.191.55'
AND src_addr != '82.112.107.142'
AND dst_addr != '82.112.107.142'
AND src_addr != '20.76.39.176'
AND dst_addr != '20.76.39.176'
AND src_addr != '162.159.129.83'
AND dst_addr != '162.159.129.83'
AND src_addr != '34.201.223.155'
AND dst_addr != '34.201.223.155'
AND src_addr != '34.201.223.156'
AND dst_addr != '34.201.223.156'
AND src_addr != '34.201.223.157'
AND dst_addr != '34.201.223.157'
AND src_addr != '134.201.223.157'
AND dst_addr != '134.201.223.157'
AND src_addr != '341.201.223.157'
AND dst_addr != '341.201.223.157'
GROUP BY
x_axis_1, x_axis_2 ORDER BY y_axis_1 DESC LIMIT 20";
let codec = DefaultPhysicalExtensionCodec {};
let plan = ctx.sql(sql).await?.create_physical_plan().await?;
let proto: protobuf::PhysicalPlanNode =
protobuf::PhysicalPlanNode::try_from_physical_plan(plan.clone(),
&codec)
.expect("to proto");
let mut buffer = BytesMut::new();
proto.encode(&mut buffer).expect("encode proto");
let proto = protobuf::PhysicalPlanNode::decode(buffer).expect("decode
proto");
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
.try_into_physical_plan(&ctx.task_ctx(), &codec)
.expect("from proto");
assert_eq!(format!("{plan:?}"), format!("{result_exec_plan:?}"));
Ok(())
}
fn create_memtable() -> Result<MemTable> {
MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]])
}
fn create_record_batch() -> Result<RecordBatch> {
let dst_geo_country_name_array = StringArray::from(vec!["USA"]);
let dst_geo_city_name_array = StringArray::from(vec!["New York"]);
let packets_array = UInt64Array::from(vec![100]);
let src_addr_array = StringArray::from(vec!["192.168.1.1"]);
let dst_addr_array = StringArray::from(vec!["192.168.1.2"]);
Ok(RecordBatch::try_new(
get_schema(),
vec![
Arc::new(dst_geo_country_name_array),
Arc::new(dst_geo_city_name_array),
Arc::new(packets_array),
Arc::new(src_addr_array),
Arc::new(dst_addr_array),
],
)
.unwrap())
}
fn get_schema() -> SchemaRef {
SchemaRef::new(Schema::new(vec![
Field::new("dst_geo_country_name", DataType::Utf8, false),
Field::new("dst_geo_city_name", DataType::Utf8, false),
Field::new("packets", DataType::UInt64, true),
Field::new("src_addr", DataType::Utf8, false),
Field::new("dst_addr", DataType::Utf8, false),
]))
}
```
### Expected behavior
decode succuess
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]