haohuaijin opened a new issue, #18602:
URL: https://github.com/apache/datafusion/issues/18602

   ### Describe the bug
   
   `
   decode proto: DecodeError { description: "recursion limit reached", stack: 
[("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), 
("PhysicalExprNode", "expr_type"), 
 ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", "expr_type"), 
("PhysicalBinaryExprNode", "l"), ("PhysicalEx
 prNode", "expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("PhysicalBinaryExprNode", "l"), ("PhysicalExprNode", 
"expr_type"), ("FilterExecNode", "expr"), ("PhysicalPlanNode", 
"physical_plan_type"), ("CoalesceBatchesExecNode", "input"), 
("PhysicalPlanNode", "physical_plan_type"), ("Re
 partitionExecNode", "input"), ("PhysicalPlanNode", "physical_plan_type"), 
("AggregateExecNode", "input"), ("PhysicalPlanNode", "physical_plan_type"), 
("RepartitionExecNode", "input"), ("PhysicalPlanNode", "physical_plan_type"), 
("CoalesceBatchesExecNode", "input"), ("PhysicalPlanNode", 
"physical_plan_type"), ("AggregateExecNode", "input"), ("PhysicalPlanNode", 
"physical_plan_type"), ("ProjectionExecNode", "input"), ("PhysicalPlanNode", 
"physical_plan_type"), ("SortExecNode", "input"), ("PhysicalPlanNode", 
"physical_plan_type"), ("SortPreservingMergeExecNode", "input"), 
("PhysicalPlanNode", "physical_plan_type")] }
   `
   
   ### To Reproduce
   
   ```rust
   use arrow::array::StringArray;
   use bytes::BytesMut;
   use datafusion::arrow::array::UInt64Array;
   use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::datasource::MemTable;
   use datafusion::error::Result;
   use datafusion::physical_plan::ExecutionPlan;
   use datafusion::prelude::*;
   use datafusion_proto::physical_plan::{AsExecutionPlan, 
DefaultPhysicalExtensionCodec};
   use datafusion_proto::protobuf;
   use prost::Message;
   use std::sync::Arc;
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let mem_table = create_memtable()?;
   
       let ctx = SessionContext::new();
   
       ctx.register_table("netflow", Arc::new(mem_table))?;
   
       let sql = "SELECT
     dst_geo_country_name AS x_axis_1,
     dst_geo_city_name AS x_axis_2,
     sum(packets) AS y_axis_1
   FROM
     netflow
   WHERE
     dst_geo_country_name IS NOT NULL 
     AND src_addr NOT LIKE '10.201.%' 
     AND dst_addr NOT LIKE '10.201.%' 
     AND src_addr NOT LIKE '10.202.%' 
     AND dst_addr NOT LIKE '10.202.%' 
     AND src_addr NOT LIKE '10.203.%' 
     AND dst_addr NOT LIKE '10.203.%' 
     AND src_addr NOT LIKE '10.204.%' 
     AND dst_addr NOT LIKE '10.204.%' 
     AND src_addr NOT LIKE '172.16.186.%' 
     AND dst_addr NOT LIKE '172.16.186.%' 
     AND src_addr NOT LIKE '172.16.187.%' 
     AND dst_addr NOT LIKE '172.16.187.%' 
     AND src_addr NOT LIKE '172.16.188.%' 
     AND dst_addr NOT LIKE '172.16.188.%' 
     AND src_addr NOT LIKE '10.102.45.%' 
     AND dst_addr NOT LIKE '10.102.45.%' 
     AND src_addr NOT LIKE '172.25.210.%' 
     AND dst_addr NOT LIKE '172.25.210.%' 
     AND src_addr NOT LIKE '172.25.211.%' 
     AND dst_addr NOT LIKE '172.25.211.%' 
     AND src_addr NOT LIKE '141.226.101.%' 
     AND dst_addr NOT LIKE '141.226.101.%' 
     AND src_addr NOT LIKE '167.86.40.%' 
     AND dst_addr NOT LIKE '167.86.40.%' 
     AND src_addr NOT LIKE '66.22.38.%' 
     AND dst_addr NOT LIKE '66.22.38.%' 
     AND src_addr != '168.143.191.55' 
     AND dst_addr != '168.143.191.55' 
     AND src_addr != '82.112.107.142' 
     AND dst_addr != '82.112.107.142' 
     AND src_addr != '20.76.39.176' 
     AND dst_addr != '20.76.39.176' 
     AND src_addr != '162.159.129.83' 
     AND dst_addr != '162.159.129.83' 
     AND src_addr != '34.201.223.155' 
     AND dst_addr != '34.201.223.155' 
     AND src_addr != '34.201.223.156' 
     AND dst_addr != '34.201.223.156' 
     AND src_addr != '34.201.223.157' 
     AND dst_addr != '34.201.223.157' 
     AND src_addr != '134.201.223.157' 
     AND dst_addr != '134.201.223.157' 
     AND src_addr != '341.201.223.157' 
     AND dst_addr != '341.201.223.157'
   GROUP BY
     x_axis_1, x_axis_2 ORDER BY y_axis_1 DESC LIMIT 20";
   
       let codec = DefaultPhysicalExtensionCodec {};
       let plan = ctx.sql(sql).await?.create_physical_plan().await?;
       let proto: protobuf::PhysicalPlanNode =
           protobuf::PhysicalPlanNode::try_from_physical_plan(plan.clone(), 
&codec)
               .expect("to proto");
   
       let mut buffer = BytesMut::new();
       proto.encode(&mut buffer).expect("encode proto");
       let proto = protobuf::PhysicalPlanNode::decode(buffer).expect("decode 
proto");
   
       let result_exec_plan: Arc<dyn ExecutionPlan> = proto
           .try_into_physical_plan(&ctx.task_ctx(), &codec)
           .expect("from proto");
   
       assert_eq!(format!("{plan:?}"), format!("{result_exec_plan:?}"));
   
       Ok(())
   }
   
   fn create_memtable() -> Result<MemTable> {
       MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]])
   }
   
   fn create_record_batch() -> Result<RecordBatch> {
       let dst_geo_country_name_array = StringArray::from(vec!["USA"]);
       let dst_geo_city_name_array = StringArray::from(vec!["New York"]);
       let packets_array = UInt64Array::from(vec![100]);
       let src_addr_array = StringArray::from(vec!["192.168.1.1"]);
       let dst_addr_array = StringArray::from(vec!["192.168.1.2"]);
   
       Ok(RecordBatch::try_new(
           get_schema(),
           vec![
               Arc::new(dst_geo_country_name_array),
               Arc::new(dst_geo_city_name_array),
               Arc::new(packets_array),
               Arc::new(src_addr_array),
               Arc::new(dst_addr_array),
           ],
       )
       .unwrap())
   }
   
   fn get_schema() -> SchemaRef {
       SchemaRef::new(Schema::new(vec![
           Field::new("dst_geo_country_name", DataType::Utf8, false),
           Field::new("dst_geo_city_name", DataType::Utf8, false),
           Field::new("packets", DataType::UInt64, true),
           Field::new("src_addr", DataType::Utf8, false),
           Field::new("dst_addr", DataType::Utf8, false),
       ]))
   }
   
   ```
   
   ### Expected behavior
   
   decode succuess
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to