[ 
https://issues.apache.org/jira/browse/FLINK-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6740:
-------------------------------
    Description: 
When we define UDTF as follows:
{code}
class TableFuncPojo extends TableFunction[TPojo] {
  def eval(age: Int, name:String): Unit = {
    collect(new TPojo(age.toLong,name))
  }
  def eval(age: Date, name:String): Unit = {
     collect(new 
TPojo(org.apache.calcite.runtime.SqlFunctions.toInt(age).toLong,name))
  }
}
{code}

TableAPI:
{code}
 val table = stream.toTable(tEnv,
      'long2, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 
'long.rowtime)
    val windowedTable = table
      .join(udtf('date, 'string) as 'pojo2).select('pojo2)
{code}
We will get the error as following:
{code}
org.apache.flink.table.api.ValidationException: Found multiple 'eval' methods 
which match the signature.

        at 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:180)
        at 
org.apache.flink.table.plan.logical.LogicalTableFunctionCall.validate(operators.scala:700)
        at org.apache.flink.table.api.Table.join(table.scala:539)
        at org.apache.flink.table.api.Table.join(table.scala:328)
        at 
org.apache.flink.table.runtime.datastream.DataStreamAggregateITCase.test1(DataStreamAggregateITCase.scala:84)
{code}

The reason is in {{ parameterTypeEquals }} method, logical as follows:
{code}
candidate == classOf[Date] && (expected == classOf[Int] || expected == 
classOf[JInt]) 
{code}
But sometimes user really need two eval 
So, We should modify the logical of  {{ parameterTypeEquals }} method.
What do you think? Welcome anybody feedback...



  was:
When we define UDTF as follows:
{code}
class TableFuncPojo extends TableFunction[TPojo] {
  def eval(age: Int, name:String): Unit = {
    collect(new TPojo(age.toLong,name))
  }
  def eval(age: Date, name:String): Unit = {
      collect(new TPojo(age.getTime,name))
  }
}
{code}

TableAPI:
{code}
 val table = stream.toTable(tEnv,
      'long2, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 
'long.rowtime)
    val windowedTable = table
      .join(udtf('date, 'string) as 'pojo2).select('pojo2)
{code}
We will get the error as following:
{code}
org.apache.flink.table.api.ValidationException: Found multiple 'eval' methods 
which match the signature.

        at 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:180)
        at 
org.apache.flink.table.plan.logical.LogicalTableFunctionCall.validate(operators.scala:700)
        at org.apache.flink.table.api.Table.join(table.scala:539)
        at org.apache.flink.table.api.Table.join(table.scala:328)
        at 
org.apache.flink.table.runtime.datastream.DataStreamAggregateITCase.test1(DataStreamAggregateITCase.scala:84)
{code}

The reason is in {{ parameterTypeEquals }} method, logical as follows:
{code}
candidate == classOf[Date] && (expected == classOf[Int] || expected == 
classOf[JInt]) 
{code}
TestData:
{code}
val data = List(
    (1L, 1, 1d, 1f, new BigDecimal("1"),
      new Timestamp(200020200),new Date(100101010),new TPojo(1L, "XX"),"Hi"),
    (2L, 2, 2d, 2f, new BigDecimal("2"),
      new Timestamp(200020200),new Date(100101010),new TPojo(1L, "XX"),"Hallo"),
    (3L, 2, 2d, 2f, new BigDecimal("2"),
      new Timestamp(200020200), new Date(100101010),new TPojo(1L, 
"XX"),"Hello"),
    (4L, 5, 5d, 5f, new BigDecimal("5"),
      new Timestamp(200020200), new Date(2334234),new TPojo(2L, "YY"),"Hello"),
    (7L, 3, 3d, 3f, new BigDecimal("3"),
      new Timestamp(200020200), new Date(666333333),new TPojo(1L, 
"XX"),"Hello"),
    (8L, 3, 3d, 3f, new BigDecimal("3"),
      new Timestamp(200020200), new Date(100101010),new TPojo(1L, "XX"),"Hello 
world"),
    (16L, 4, 4d, 4f, new BigDecimal("4"),
      new Timestamp(200020200), new Date(100101010),new TPojo(1L, "XX"),"Hello 
world"))
{code}
But when we only define one `eval` method, we got different result, as follows:

{code}
// for def eval(age: Int, name:String)
Pojo{id=0, name='Hello'}
Pojo{id=1, name='Hallo'}
Pojo{id=1, name='Hello world'}
Pojo{id=1, name='Hello world'}
Pojo{id=1, name='Hello'}
Pojo{id=1, name='Hi'}
Pojo{id=8, name='Hello'}

// for def eval(age: Date, name:String)
Pojo{id=-28800000, name='Hello'}
Pojo{id=57600000, name='Hallo'}
Pojo{id=57600000, name='Hello world'}
Pojo{id=57600000, name='Hello world'}
Pojo{id=57600000, name='Hello'}
Pojo{id=57600000, name='Hi'}
Pojo{id=662400000, name='Hello'}
{code}

So, We should modify the logical of  {{ parameterTypeEquals }} method.
What do you think? Welcome anybody feedback...




> Fix "parameterTypeEquals" method error.
> ---------------------------------------
>
>                 Key: FLINK-6740
>                 URL: https://issues.apache.org/jira/browse/FLINK-6740
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> When we define UDTF as follows:
> {code}
> class TableFuncPojo extends TableFunction[TPojo] {
>   def eval(age: Int, name:String): Unit = {
>     collect(new TPojo(age.toLong,name))
>   }
>   def eval(age: Date, name:String): Unit = {
>      collect(new 
> TPojo(org.apache.calcite.runtime.SqlFunctions.toInt(age).toLong,name))
>   }
> }
> {code}
> TableAPI:
> {code}
>  val table = stream.toTable(tEnv,
>       'long2, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 
> 'long.rowtime)
>     val windowedTable = table
>       .join(udtf('date, 'string) as 'pojo2).select('pojo2)
> {code}
> We will get the error as following:
> {code}
> org.apache.flink.table.api.ValidationException: Found multiple 'eval' methods 
> which match the signature.
>       at 
> org.apache.flink.table.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:180)
>       at 
> org.apache.flink.table.plan.logical.LogicalTableFunctionCall.validate(operators.scala:700)
>       at org.apache.flink.table.api.Table.join(table.scala:539)
>       at org.apache.flink.table.api.Table.join(table.scala:328)
>       at 
> org.apache.flink.table.runtime.datastream.DataStreamAggregateITCase.test1(DataStreamAggregateITCase.scala:84)
> {code}
> The reason is in {{ parameterTypeEquals }} method, logical as follows:
> {code}
> candidate == classOf[Date] && (expected == classOf[Int] || expected == 
> classOf[JInt]) 
> {code}
> But sometimes user really need two eval 
> So, We should modify the logical of  {{ parameterTypeEquals }} method.
> What do you think? Welcome anybody feedback...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to