添加代码文字:

def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tableEnv = StreamTableEnvironment.create(env)

    val schema =
"{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
    val parquetTableSource: ParquetTableSource = ParquetTableSource
            .builder
            .forParquetSchema(new
org.apache.parquet.avro.AvroSchemaConverter().convert(
                org.apache.avro.Schema.parse(schema, true)))
            .path("/Users/sujun/Documents/tmp/login_data")
            .build

    tableEnv.registerTableSource("source",parquetTableSource)


    val t1 = tableEnv.sqlQuery("select log_id,city from source where
city = '274' ")
    tableEnv.registerTable("t1",t1)

    val t4 = tableEnv.sqlQuery("select * from t1 where
log_id='5927070661978133'")
    t1.toAppendStream[Row].print()

    env.execute()

}


jun su <sujun891...@gmail.com> 于2020年1月8日周三 下午4:59写道:

> 你好:
>        我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
> Pushdown的Bug, 以下是代码和描述:
>
> [image: 1578473593933.jpg]
>
> debug发现,
> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while
> true循环一直出不来, 知道整合程序OOM
>
> [image: 1.jpg]
>
> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
>

回复