Github user AlexanderKoltsov commented on the issue:

    https://github.com/apache/flink/pull/6287
  
    The only thing that I didn't do is: I didn't add unit test for stream 
environment.
    During this test I got exception.
    
    ### Test in org.apache.flink.table.runtime.stream.sql package:
        @Test
        def testValuesWithCast(): Unit = {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(env)
          StreamITCase.clear
      
          val sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
            "(2, cast(2 as BIGINT))," +
            "(3, cast(3 as BIGINT))"
      
          val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
          results.addSink(new StreamITCase.RetractingSink)
          env.execute()
      
          val expected = Seq(
            "1,1\n2,2\n3,3"
          )
          assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
        }
    
    ### Output:
    org.apache.flink.table.api.TableException: Cannot generate a valid 
execution plan for the given query: 
    
    FlinkLogicalUnion(all=[true])
      FlinkLogicalCalc(expr#0=[{inputs}], expr#1=[1], expr#2=[1], EXPR$0=[$t1], 
EXPR$1=[$t2])
        FlinkLogicalValues(tuples=[[{ 0 }]])
      FlinkLogicalCalc(expr#0=[{inputs}], expr#1=[2], expr#2=[2], EXPR$0=[$t1], 
EXPR$1=[$t2])
        FlinkLogicalValues(tuples=[[{ 0 }]])
      FlinkLogicalCalc(expr#0=[{inputs}], expr#1=[3], expr#2=[3], EXPR$0=[$t1], 
EXPR$1=[$t2])
        FlinkLogicalValues(tuples=[[{ 0 }]])
    
    This exception indicates that the query uses an unsupported SQL feature.
    Please check the documentation for the set of currently supported SQL 
features.
    
        at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
        at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:731)
        at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:778)
        at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:254)
        at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:234)
        at 
org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:189)


---

Reply via email to