I create a JIRA issue here, https://issues.apache.org/jira/browse/FLINK-18782
And thanks for your advice to avoid 「top-level projection/rename」^_^










At 2020-07-30 16:58:45, "Dawid Wysakowicz" <dwysakow...@apache.org> wrote:

Hi,

I am afraid you are facing an issue that was not checked for/was not 
considered. I think your use case is absolutely valid and should be supported.

The problem you are facing as far as I can tell from an initial investigation 
is that the top-level projection/rename is not being applied. Internally the 
foo(a) is passed around as an unnamed expression and should be aliased at the 
top level. This happens when simply querying therefore you get expected results 
in the first case when only printing the schema of a Table.

When translating to the datastream this final rename does not take place, which 
imo is a bug. You can see this behaviour if you add additional projection. Then 
the renaming of the expression from lateral table happens a level deeper and is 
not stripped.


    val t1 = tableEnv.sqlQuery(
      """
        |SELECT 1, * FROM (
        |SELECT source_table.a, b FROM source_table
        |, LATERAL TABLE(foo(a)) as T(b))
        |""".stripMargin




    t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer)
    new t1 table schema: root
     |-- EXPR$0: INT
     |-- a: INT
     |-- b: INT




Do you mind creating a JIRA issue to fix/support this case?


Unfortunately I can not think of a really good way how you could retain the 
column names. :(

Best,

Dawid


On 28/07/2020 10:26, izual wrote:

Hi, Community:
  I met some field name errors when try to convert in Table and DataStream.
  flink version: 1.9.1


First, init a datastream and convert to table 'source', register a 
tablefunction named 'foo'
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
  .map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
  def eval(col: Int): Unit = collect((col * 10))

}
tableEnv.registerFunction("foo", new Foo)
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
val t1 = tableEnv.sqlQuery(
  """
    |SELECT source_table.a, b FROM source_table
    |, LATERAL TABLE(foo(a)) as T(b)
    |""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
When I try to convert 't1' to a datastream then register to a new table(for 
some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
I can find 'f0' only with the Java-API in Refs-1.
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
 */
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
Consider maybe the default TypeExtractor(?) works not very well here, Then I 
try to set the field name explicitly, but failed too.
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
If I add a proctime at first, this works well, but I do not want to set a 
proctime which is unused.
tableEnv.registerDataStream("source_table", sourceStream, 'a, 
'proctime.proctime)


And my question is :
1. why the behavior of the code above seems a little strange?
2. How to retain the 'b' when convert with table and stream frequently?


Refs:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#register-a-datastream-or-dataset-as-table


Thanks for ur reply.




 

Reply via email to