[ 
https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957214#comment-15957214
 ] 

ASF GitHub Bot commented on FLINK-6196:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3623#discussion_r109965960
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
 ---
    @@ -109,6 +110,133 @@ class TableFunc3(data: String, conf: Map[String, 
String]) extends TableFunction[
       }
     }
     
    +class DynamicSchema extends TableFunction[Row] {
    +
    +  def eval(str: String, column: Int): Unit = {
    +    if (str.contains("#")) {
    +      str.split("#").foreach({ s =>
    +        val row = new Row(column)
    +        row.setField(0, s)
    +        var i = 0
    +        for (i <- 1 until column) {
    +          row.setField(i, s.length)
    +        }
    +        collect(row)
    +      })
    +    }
    +  }
    +
    +  override def getResultType(arguments: java.util.List[AnyRef]): 
TypeInformation[Row] = {
    +    val column = arguments.get(1).asInstanceOf[Int]
    +    val basicTypeInfos = new Array[TypeInformation[_]](column)
    +    basicTypeInfos(0) = BasicTypeInfo.STRING_TYPE_INFO
    +    for (i <- 1 until column) {
    +      basicTypeInfos(i) = BasicTypeInfo.INT_TYPE_INFO
    +    }
    +    new RowTypeInfo(basicTypeInfos: _*)
    +  }
    +}
    +
    +class DynamicSchema0 extends TableFunction[Row] {
    +
    +  def eval(str: String, cols: String): Unit = {
    +    val columns = cols.split(",")
    +
    +    if (str.contains("#")) {
    +      str.split("#").foreach({ s =>
    +        val row = new Row(columns.length)
    +        row.setField(0, s)
    +        for (i <- 1 until columns.length) {
    +          if (columns(i).equals("string")) {
    +            row.setField(i, s.length.toString)
    +          } else if (columns(i).equals("int")) {
    +            row.setField(i, s.length)
    +          }
    +        }
    +        collect(row)
    +      })
    +    }
    +  }
    +
    +  override def getResultType(arguments: java.util.List[AnyRef]): 
TypeInformation[Row] = {
    +    val columnStr = arguments.get(1).asInstanceOf[String]
    +    val columns = columnStr.split(",")
    +
    +    val basicTypeInfos = new Array[TypeInformation[_]](columns.length)
    +    for (i <- 0 until columns.length) {
    +      if (columns(i).equals("string")) {
    +        basicTypeInfos(i) = BasicTypeInfo.STRING_TYPE_INFO
    +      } else if (columns(i).equals("int")) {
    +        basicTypeInfos(i) = BasicTypeInfo.INT_TYPE_INFO
    +      }
    +    }
    +    new RowTypeInfo(basicTypeInfos: _*)
    +  }
    +}
    +
    +class DynamicSchemaWithRexNodes extends TableFunction[Row] {
    +
    +  def eval(str: String, i: Int, si: Int, bi: Int, flt: Double, real: 
Double, d: Double, b: Boolean):
    +  Unit = {
    +    val row = new Row(8)
    +    row.setField(0, str)
    +    row.setField(1, i)
    +    row.setField(2, si)
    +    row.setField(3, bi)
    +    row.setField(4, flt)
    +    row.setField(5, real)
    +    row.setField(6, d)
    +    row.setField(7, b)
    +    collect(row)
    +  }
    +
    +  override def getResultType(arguments: util.List[AnyRef]): 
TypeInformation[Row] = {
    --- End diff --
    
    Can we add a more non primitive types like `Timestamp`?


> Support dynamic schema in Table Function
> ----------------------------------------
>
>                 Key: FLINK-6196
>                 URL: https://issues.apache.org/jira/browse/FLINK-6196
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Zhuoluo Yang
>            Assignee: Zhuoluo Yang
>
> In many of our use cases. We have to decide the schema of a UDTF at the run 
> time. For example. udtf('c1, c2, c3') will generate three columns for a 
> lateral view. 
> Most systems such as calcite and hive support this feature. However, the 
> current implementation of flink didn't implement the feature correctly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to