Hi,

I am afraid you are facing an issue that was not checked for/was not
considered. I think your use case is absolutely valid and should be
supported.

The problem you are facing as far as I can tell from an initial
investigation is that the top-level projection/rename is not being
applied. Internally the foo(a) is passed around as an unnamed expression
and should be aliased at the top level. This happens when simply
querying therefore you get expected results in the first case when only
printing the schema of a Table.

When translating to the datastream this final rename does not take
place, which imo is a bug. You can see this behaviour if you add
additional projection. Then the renaming of the expression from lateral
table happens a level deeper and is not stripped.

    val t1 = tableEnv.sqlQuery(
      """
        |SELECT 1, * FROM (
        |SELECT source_table.a, b FROM source_table
        |, LATERAL TABLE(foo(a)) as T(b))
        |""".stripMargin


    t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer)
    new t1 table schema: root
     |-- EXPR$0: INT
     |-- a: INT
     |-- b: INT


Do you mind creating a JIRA issue to fix/support this case?

Unfortunately I can not think of a really good way how you could retain
the column names. :(

Best,

Dawid

On 28/07/2020 10:26, izual wrote:
> Hi, Community:
>   I met some field name errors when try to convert in Table and
> DataStream.
>   flink version: 1.9.1
>
> First, init a datastream and convert to table 'source', register a
> tablefunction named 'foo'
> val sourceStream = env.socketTextStream("127.0.0.1", 8010)
>   .map(line => line.toInt)
> tableEnv.registerDataStream("source_table", sourceStream, 'a)
>
> class Foo() extends TableFunction[(Int)] {
>   def eval(col: Int): Unit = collect((col * 10))
> }
> tableEnv.registerFunction("foo", new Foo)
> Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
> val t1 = tableEnv.sqlQuery(
>   """ |SELECT source_table.a, b FROM source_table |, LATERAL
> TABLE(foo(a)) as T(b) |""".stripMargin
> )
> /* t1 table schema: root |-- a: INT |-- b: INT */ println(s"t1 table schema: 
> ${t1.getSchema}")
> When I try to convert 't1' to a datastream then register to a new
> table(for some reason) named 't1', the columns changes to 'a' 'f0',
> not 'a' 'b'
> I can find 'f0' only with the Java-API in Refs-1.
> val t1Stream = t1.toAppendStream[Row]
> // t1 stream schema: Row(a: Integer, f0: Integer) println(s"t1 stream schema: 
> ${t1Stream.getType()}")
> tableEnv.registerDataStream("t1", t1Stream)
> /* new t1 table schema: root |-- a: INT |-- f0: INT */ println(s"new t1 table 
> schema: ${tableEnv.scan("t1").getSchema}")
> Consider maybe the default TypeExtractor(?) works not very well here,
> Then I try to set the field name explicitly, but failed too.
> tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
> If I add a proctime at first, this works well, but I do not want to
> set a proctime which is unused.
> tableEnv.registerDataStream("source_table", sourceStream, 'a, 
> 'proctime.proctime)
>
> And my question is :
> 1. why the behavior of the code above seems a little strange?
> 2. How to retain the 'b' when convert with table and stream frequently?
>
> Refs:
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#register-a-datastream-or-dataset-as-table
>
> Thanks for ur reply.
>
>
>  
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to