[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2653


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90583213
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableFunctionCall.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.{Expression, 
UnresolvedFieldReference}
+import org.apache.flink.api.table.functions.TableFunction
+import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.getFieldInfo
+import org.apache.flink.api.table.plan.logical.{LogicalNode, 
LogicalTableFunctionCall}
+
+
+/**
+  * A [[TableFunctionCall]] represents a call to a [[TableFunction]] with 
actual parameters.
+  *
+  * For Scala users, Flink will help to parse a [[TableFunction]] to 
[[TableFunctionCall]]
+  * implicitly. For Java users, Flink will help to parse a string 
expression to
+  * [[TableFunctionCall]]. So users do not need to create a 
[[TableFunctionCall]] manually.
+  *
+  * @param functionName function name
+  * @param tableFunction user-defined table function
+  * @param parameters actual parameters of function
+  * @param resultType type information of returned table
+  */
+case class TableFunctionCall(
+functionName: String,
+tableFunction: TableFunction[_],
+parameters: Seq[Expression],
+resultType: TypeInformation[_]) {
+
+  private var aliases: Option[Seq[Expression]] = None
+
+  /**
+* Assigns an alias for this table function returned fields that the 
following `select()` clause
+* can refer to.
+*
+* @param aliasList alias for this table function returned fields
+* @return this table function call
+*/
+  def as(aliasList: Expression*): TableFunctionCall = {
+this.aliases = Some(aliasList)
+this
+  }
+
+  /**
+* Converts an API class to a logical node for planning.
+*/
+  private[flink] def toLogicalTableFunctionCall(child: LogicalNode): 
LogicalTableFunctionCall = {
+val originNames = getFieldInfo(resultType)._1
+
+// determine the final field names
+val fieldNames = if (aliases.isDefined) {
+  val aliasList = aliases.get
+  if (aliasList.length != originNames.length) {
+throw ValidationException(
+  s"List of column aliases must have same degree as table; " +
+s"the returned table of function '$functionName' has 
${originNames.length} " +
+s"columns (${originNames.mkString(",")}), " +
+s"whereas alias list has ${aliasList.length} columns")
+  } else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+throw ValidationException("Alias only accept name expressions as 
arguments")
+  } else {
+
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray
+  }
+} else {
+  originNames
+}
+
+LogicalTableFunctionCall(
+  functionName,
+  tableFunction,
+  parameters,
+  resultType,
+  fieldNames,
+  child)
+  }
+}
+
+
+case class TableFunctionCallBuilder[T: TypeInformation](udtf: 
TableFunction[T]) {
--- End diff --

I would move this into `ExpressionUtils`, learned from your new 
[PR](https://github.com/apache/flink/pull/2919/files#diff-04d1bca648d7ee47ab9ce787c8d944a6R567)
  😄 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90582259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import java.util
+
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.ValidationException
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * zero, one, or multiple scalar values as input and returns multiple 
rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * Example:
+  *
+  * {{{
+  *
+  *   public class Split extends TableFunction {
+  *
+  * // implement an "eval" method with several parameters you want
+  * public void eval(String str) {
+  *   for (String s : str.split(" ")) {
+  * collect(s);   // use collect(...) to emit an output row
+  *   }
+  * }
+  *
+  * // can overloading eval method here ...
+  *   }
+  *
+  *   val tEnv: TableEnvironment = ...
+  *   val table: Table = ...// schema: [a: String]
+  *
+  *   // for Scala users
+  *   val split = new Split()
+  *   table.crossApply(split('c) as ('s)).select('a, 's)
+  *
+  *   // for Java users
+  *   tEnv.registerFunction("split", new Split())   // register table 
function first
+  *   table.crossApply("split(a) as (s)").select("a, s")
+  *
+  *   // for SQL users
+  *   tEnv.registerFunction("split", new Split())   // register table 
function first
+  *   tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
+  *
+  * }}}
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] {
+
+  private val rows: util.ArrayList[T] = new util.ArrayList[T]()
+
+  /**
+* Emit an output row.
+*
+* @param row the output row
+*/
+  protected def collect(row: T): Unit = {
+// cache rows for now, maybe immediately process them further
+rows.add(row)
+  }
+
+  /**
+* Internal use. Get an iterator of the buffered rows.
+*/
+  def getRowsIterator = rows.iterator()
+
+  /**
+* Internal use. Clear buffered rows.
+*/
+  def clear() = rows.clear()
+
+  // 
--
+
+  /**
+* Returns the result type of the evaluation method with a given 
signature.
+*
+* This method needs to be overriden in case 

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90479434
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 ---
@@ -451,6 +452,28 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 }
   }
 
+  lazy val tableFunctionCall: PackratParser[LogicalNode] =
+functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+case name ~ _ ~ args ~ _ => 
UnresolvedTableFunctionCall(name.toUpperCase, args)
--- End diff --

Hmm, I think it would be nice to have as few Strings in the Scala Table API 
as possible. Ideally, we only need a parser for the Java Table API. Couldn't we 
convert the TableFunction into a function call and extend AS to support 
multiple names?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90478025
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -334,18 +337,33 @@ class CodeGenerator(
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
 val input1AccessExprs = for (i <- 0 until input1.getArity)
-  yield generateInputAccess(input1, input1Term, i)
+  yield generateInputAccess(input1, input1Term, i, 
input1PojoFieldMapping)
 
 val input2AccessExprs = input2 match {
   case Some(ti) => for (i <- 0 until ti.getArity)
-yield generateInputAccess(ti, input2Term, i)
+yield generateInputAccess(ti, input2Term, i, 
input2PojoFieldMapping)
   case None => Seq() // add nothing
 }
 
 generateResultExpression(input1AccessExprs ++ input2AccessExprs, 
returnType, resultFieldNames)
   }
 
   /**
+* Generates an expression from the left input and the right table 
function.
+*/
+  def generateCorrelateAccessExprs: (Seq[GeneratedExpression], 
Seq[GeneratedExpression]) = {
+val input1AccessExprs = for (i <- 0 until input1.getArity)
+  yield generateInputAccess(input1, input1Term, i, 
input1PojoFieldMapping)
+
+val input2AccessExprs = input2 match {
+  case Some(ti) => for (i <- 0 until ti.getArity)
+yield generateFieldAccess(ti, input2Term, i, 
input2PojoFieldMapping)
--- End diff --

Because `generateInputAccess` will put the field access code into 
`reusableInputUnboxingExprs`, it is forced to be put on the top of function 
body. But the table function's output is an Iterable, we need to access the 
field in a while loop. That's why we use `generateFieldAccess` to manually 
manipulate the access field code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90052039
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.{CodeGenException, 
CodeGenerator, GeneratedExpression}
+import org.apache.flink.api.table.functions.TableFunction
+import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+  * Generates a call to user-defined [[TableFunction]].
+  *
+  * @param tableFunction user-defined [[TableFunction]] that might be 
overloaded
+  * @param signature actual signature with which the function is called
+  * @param returnType actual return type required by the surrounding
+  */
+class TableFunctionCallGen(
+tableFunction: TableFunction[_],
+signature: Seq[TypeInformation[_]],
+returnType: TypeInformation[_])
+  extends CallGenerator {
+
+  override def generate(
+  codeGenerator: CodeGenerator,
+  operands: Seq[GeneratedExpression])
+: GeneratedExpression = {
+// determine function signature
+val matchingSignature = getSignature(tableFunction.getClass, signature)
+  .getOrElse(throw new CodeGenException("No matching signature 
found."))
+
+// convert parameters for function (output boxing)
+val parameters = matchingSignature
+.zip(operands)
+.map { case (paramClass, operandExpr) =>
+  if (paramClass.isPrimitive) {
+operandExpr
+  } else {
+val boxedTypeTerm = 
boxedTypeTermForTypeInfo(operandExpr.resultType)
+val boxedExpr = 
codeGenerator.generateOutputFieldBoxing(operandExpr)
+val exprOrNull: String = if (codeGenerator.nullCheck) {
+  s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) 
${boxedExpr.resultTerm}"
+} else {
+  boxedExpr.resultTerm
+}
+boxedExpr.copy(resultTerm = exprOrNull)
+  }
+}
+
+// generate function call
+val functionReference = 
codeGenerator.addReusableInstance(tableFunction)
+val functionCallCode =
+  s"""
+|${parameters.map(_.code).mkString("\n")}
+|$functionReference.clear();
+
|$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")});
+|""".stripMargin
+
+// has no result
+GeneratedExpression(functionReference, "false", functionCallCode, 
returnType)
--- End diff --

Btw. I created the constant `GeneratedExpression.NEVER_NULL` instead of 
`"false"` to make it more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90038389
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -1332,16 +1360,17 @@ class CodeGenerator(
   }
 
   /**
-* Adds a reusable [[UserDefinedFunction]] to the member area of the 
generated [[Function]].
-* The [[UserDefinedFunction]] must have a default constructor, 
however, it does not have
+* Adds a reusable instance (a 
[[org.apache.flink.api.table.functions.TableFunction]] or
+* [[org.apache.flink.api.table.functions.ScalarFunction]]) to the 
member area of the generated
+* [[Function]]. The instance class must have a default constructor, 
however, it does not have
 * to be public.
 *
-* @param function [[UserDefinedFunction]] object to be instantiated 
during runtime
+* @param instance object to be instantiated during runtime
 * @return member variable term
 */
-  def addReusableFunction(function: UserDefinedFunction): String = {
-val classQualifier = function.getClass.getCanonicalName
-val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
+  def addReusableInstance(instance: Any): String = {
--- End diff --

I think it would not hurt if we keep `UserDefinedFunction` and have it as a 
superclass/superinterface of `ScalarFunction` and `TableFunction` (even without 
implementation). Better than definining `Any` at certain locations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90050627
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/SqlFunctionUtils.scala
 ---
@@ -28,14 +28,14 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.api.table.functions.utils.ScalarSqlFunction
+import org.apache.flink.api.table.functions.utils.{TableSqlFunction, 
ScalarSqlFunction}
 
 import scala.collection.mutable
 
 /**
-  * Global hub for user-defined and built-in advanced SQL scalar functions.
+  * Global hub for user-defined and built-in advanced SQL functions.
   */
-object ScalarFunctions {
+object SqlFunctionUtils {
--- End diff --

We could call this `FunctionGenerator`. IMO it is more than just a util 
class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90442225
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 ---
@@ -451,6 +452,28 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 }
   }
 
+  lazy val tableFunctionCall: PackratParser[LogicalNode] =
+functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+case name ~ _ ~ args ~ _ => 
UnresolvedTableFunctionCall(name.toUpperCase, args)
--- End diff --

I think it would not hurt if we support `split(c) as (f1, f2)` for all 
expressions. This could also help in cases where we expand the projection like 
`pojo.flatten() as (f1, f2)`. I don't like the idea of a logical parser. It is 
already confusing enough for the user that we have Java API expression strings 
and SQL strings, we don't need another string syntax.
@fhueske what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90029283
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
 ---
@@ -142,4 +143,14 @@ class StreamTableEnvironment(
 asScalaStream(translate(table))
   }
 
+  /**
+* Registers a [[TableFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in SQL queries.
--- End diff --

"referenced in Table API and SQL"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90426441
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import 
org.apache.flink.api.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{TableConfig, TableException}
+
+import scala.collection.JavaConverters._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(
+  generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: Option[RexNode],
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+  returnType, rowType.getFieldNames.asScala)
+
+val call = generator.generateExpression(rexCall)
+var body =
+  s"""
+ |${call.code}
+ |java.util.Iterator iter = ${call.resultTerm}.getRowsIterator();
+   """.stripMargin
+
+if (joinType == SemiJoinType.INNER) {
+  // cross apply
+  body +=
+s"""
+   |if (!iter.hasNext()) {
+   |  return;
+   |}
+""".stripMargin
+} else if (joinType == SemiJoinType.LEFT) {
+  // outer apply
+  val input2NullExprs = input2AccessExprs.map(
+x => GeneratedExpression(primitiveDefaultValue(x.resultType), 
"true", "", x.resultType))
+  val outerResultExpr = generator.generateResultExpression(
+input1AccessExprs ++ input2NullExprs, returnType, 
rowType.getFieldNames.asScala)
+  body +=
+s"""
+   |if (!iter.hasNext()) {
+   |  ${outerResultExpr.code}
+   |  
${generator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+   |  return;
+   |}
+""".stripMargin
+} else {
+  throw TableException(s"Unsupported SemiJoinType: $joinType for 
correlate join.")
+}
+
+val projection = if (condition.isEmpty) {
+  s"""
+ |${crossResultExpr.code}
+ 
|${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
+   """.stripMargin
+} else {
+  val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo) 
{
--- End diff --

Maybe we could also make this configurable instead of extending the 
generator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90419246
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import java.util
+
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.ValidationException
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * zero, one, or multiple scalar values as input and returns multiple 
rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * Example:
+  *
+  * {{{
+  *
+  *   public class Split extends TableFunction {
+  *
+  * // implement an "eval" method with several parameters you want
+  * public void eval(String str) {
+  *   for (String s : str.split(" ")) {
+  * collect(s);   // use collect(...) to emit an output row
+  *   }
+  * }
+  *
+  * // can overloading eval method here ...
+  *   }
+  *
+  *   val tEnv: TableEnvironment = ...
+  *   val table: Table = ...// schema: [a: String]
+  *
+  *   // for Scala users
+  *   val split = new Split()
+  *   table.crossApply(split('c) as ('s)).select('a, 's)
+  *
+  *   // for Java users
+  *   tEnv.registerFunction("split", new Split())   // register table 
function first
+  *   table.crossApply("split(a) as (s)").select("a, s")
+  *
+  *   // for SQL users
+  *   tEnv.registerFunction("split", new Split())   // register table 
function first
+  *   tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
+  *
+  * }}}
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] {
+
+  private val rows: util.ArrayList[T] = new util.ArrayList[T]()
+
+  /**
+* Emit an output row.
+*
+* @param row the output row
+*/
+  protected def collect(row: T): Unit = {
+// cache rows for now, maybe immediately process them further
+rows.add(row)
+  }
+
+  /**
+* Internal use. Get an iterator of the buffered rows.
+*/
+  def getRowsIterator = rows.iterator()
+
+  /**
+* Internal use. Clear buffered rows.
+*/
+  def clear() = rows.clear()
+
+  // 
--
+
+  /**
+* Returns the result type of the evaluation method with a given 
signature.
+*
+* This method needs to be overriden in case 

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90059865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -411,22 +416,31 @@ case class Join(
 right)
 }
 val resolvedCondition = 
node.condition.map(_.postOrderTransform(partialFunction))
-Join(node.left, node.right, node.joinType, resolvedCondition)
+Join(node.left, node.right, node.joinType, resolvedCondition, 
correlated)
   }
 
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
 left.construct(relBuilder)
 right.construct(relBuilder)
+
+val corSet = Sets.newHashSet[CorrelationId]()
--- End diff --

Can we use a Scala data structure here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90034800
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -334,18 +337,33 @@ class CodeGenerator(
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
 val input1AccessExprs = for (i <- 0 until input1.getArity)
-  yield generateInputAccess(input1, input1Term, i)
+  yield generateInputAccess(input1, input1Term, i, 
input1PojoFieldMapping)
 
 val input2AccessExprs = input2 match {
   case Some(ti) => for (i <- 0 until ti.getArity)
-yield generateInputAccess(ti, input2Term, i)
+yield generateInputAccess(ti, input2Term, i, 
input2PojoFieldMapping)
   case None => Seq() // add nothing
 }
 
 generateResultExpression(input1AccessExprs ++ input2AccessExprs, 
returnType, resultFieldNames)
   }
 
   /**
+* Generates an expression from the left input and the right table 
function.
+*/
+  def generateCorrelateAccessExprs: (Seq[GeneratedExpression], 
Seq[GeneratedExpression]) = {
+val input1AccessExprs = for (i <- 0 until input1.getArity)
+  yield generateInputAccess(input1, input1Term, i, 
input1PojoFieldMapping)
+
+val input2AccessExprs = input2 match {
+  case Some(ti) => for (i <- 0 until ti.getArity)
+yield generateFieldAccess(ti, input2Term, i, 
input2PojoFieldMapping)
--- End diff --

Is there a reason why you don't use `generateInputAccess` here? Could save 
us a bit generated code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90033099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableFunctionCall.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.{Expression, 
UnresolvedFieldReference}
+import org.apache.flink.api.table.functions.TableFunction
+import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.getFieldInfo
+import org.apache.flink.api.table.plan.logical.{LogicalNode, 
LogicalTableFunctionCall}
+
+
+/**
+  * A [[TableFunctionCall]] represents a call to a [[TableFunction]] with 
actual parameters.
+  *
+  * For Scala users, Flink will help to parse a [[TableFunction]] to 
[[TableFunctionCall]]
+  * implicitly. For Java users, Flink will help to parse a string 
expression to
+  * [[TableFunctionCall]]. So users do not need to create a 
[[TableFunctionCall]] manually.
+  *
+  * @param functionName function name
+  * @param tableFunction user-defined table function
+  * @param parameters actual parameters of function
+  * @param resultType type information of returned table
+  */
+case class TableFunctionCall(
+functionName: String,
+tableFunction: TableFunction[_],
+parameters: Seq[Expression],
+resultType: TypeInformation[_]) {
+
+  private var aliases: Option[Seq[Expression]] = None
+
+  /**
+* Assigns an alias for this table function returned fields that the 
following `select()` clause
+* can refer to.
+*
+* @param aliasList alias for this table function returned fields
+* @return this table function call
+*/
+  def as(aliasList: Expression*): TableFunctionCall = {
+this.aliases = Some(aliasList)
+this
+  }
+
+  /**
+* Converts an API class to a logical node for planning.
+*/
+  private[flink] def toLogicalTableFunctionCall(child: LogicalNode): 
LogicalTableFunctionCall = {
+val originNames = getFieldInfo(resultType)._1
+
+// determine the final field names
+val fieldNames = if (aliases.isDefined) {
+  val aliasList = aliases.get
+  if (aliasList.length != originNames.length) {
+throw ValidationException(
+  s"List of column aliases must have same degree as table; " +
+s"the returned table of function '$functionName' has 
${originNames.length} " +
+s"columns (${originNames.mkString(",")}), " +
+s"whereas alias list has ${aliasList.length} columns")
+  } else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+throw ValidationException("Alias only accept name expressions as 
arguments")
+  } else {
+
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray
+  }
+} else {
+  originNames
+}
+
+LogicalTableFunctionCall(
+  functionName,
+  tableFunction,
+  parameters,
+  resultType,
+  fieldNames,
+  child)
+  }
+}
+
+
+case class TableFunctionCallBuilder[T: TypeInformation](udtf: 
TableFunction[T]) {
--- End diff --

I would move this class to `org.apache.flink.api.scala.table`. It is Scala 
API only and fits better next to `expressionDsl.scala`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90029320
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
 ---
@@ -139,4 +140,15 @@ class BatchTableEnvironment(
 wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
   }
 
+  /**
+* Registers a [[TableFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in SQL queries.
--- End diff --

"referenced in Table API and SQL"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90421294
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import 
org.apache.flink.api.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{TableConfig, TableException}
+
+import scala.collection.JavaConverters._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(
+  generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: Option[RexNode],
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
--- End diff --

Please use one line per argument for all method calls in this class. It is 
very hard to read the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90425776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import 
org.apache.flink.api.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{TableConfig, TableException}
+
+import scala.collection.JavaConverters._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(
+  generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: Option[RexNode],
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+  returnType, rowType.getFieldNames.asScala)
+
+val call = generator.generateExpression(rexCall)
+var body =
+  s"""
+ |${call.code}
+ |java.util.Iterator iter = ${call.resultTerm}.getRowsIterator();
+   """.stripMargin
+
+if (joinType == SemiJoinType.INNER) {
+  // cross apply
+  body +=
+s"""
+   |if (!iter.hasNext()) {
+   |  return;
+   |}
+""".stripMargin
+} else if (joinType == SemiJoinType.LEFT) {
+  // outer apply
+  val input2NullExprs = input2AccessExprs.map(
--- End diff --

Could you add some more comments in this class? Esp. why you are doing 
certain things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-12-01 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r90036667
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -753,8 +777,9 @@ class CodeGenerator(
 }
   }
 
-  override def visitCorrelVariable(correlVariable: RexCorrelVariable): 
GeneratedExpression =
-throw new CodeGenException("Correlating variables are not supported 
yet.")
+  override def visitCorrelVariable(correlVariable: RexCorrelVariable): 
GeneratedExpression = {
--- End diff --

You implemented this method only for table functions right? Is it possible 
that other SQL queries that are actually not supported call this method by 
accident. This would lead to undesired behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r89252908
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
--- End diff --

hi @fhueske When we declare the ScalarFunction and TableFunction as 
follows: trait ScalarFunction, trait TableFunction [T], it will become very 
useful. The current version is optional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r89124951
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
--- End diff --

This method is never used. I carelessly introduced this method before. So I 
will remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r89124491
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
--- End diff --

This method is never used. So I will remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-18 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88623426
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -154,6 +154,10 @@ under the License.
maven-shade-plugin


+   package
+   
+   shade
--- End diff --

If you have tested that a Table API word count can still be executed on the 
cluster, I'm fine with this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-18 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88622767
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
+
+  private val rows: ListBuffer[T] = new ListBuffer
+
+  /**
+* Emit an output row
+*
+* @param row the output row
+*/
+  protected def collect(row: T): Unit = {
+// cache rows for now, maybe immediately process them further
+rows += row
+  }
+
+
+  @Internal
+  def getRowsIterator = rows.toIterator
+
+  @Internal
+  def clear() = rows.clear()
+
+  // this method will not be called, because we need to register multiple 
sql function at one time
+  override private[flink] final def createSqlFunction(
+  name: String,
+  typeFactory: FlinkTypeFactory)
+: SqlFunction = {
+throw new UnsupportedOperationException("this method should not be 
called")
--- End diff --

Because we have to register `TableSqlFunction`s for every `eval` method. 
But the interface `createSqlFunction(name: String, typeFactory: 
FlinkTypeFactory)  : SqlFunction` only return one SqlFunction. 

I'm thinking about moving `createSqlFunction` to utils. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-18 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88617248
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 ---
@@ -47,13 +52,50 @@ class FunctionCatalog {
 sqlFunctions += sqlFunction
   }
 
+  /** Register multiple sql functions at one time. The functions has the 
same name. **/
+  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-18 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88617192
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -611,6 +612,130 @@ class Table(
   }
 
   /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply(split('c).as('s)).select('a,'b,'c,'s)
+* }}}
+*/
+  def crossApply(udtf: TableFunctionCall[_]): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply("split('c') as (s)").select("a, b, c, s")
+* }}}
+*/
+  def crossApply(udtf: String): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.outerApply(split('c).as('s)).select('a,'b,'c,'s)
+* }}}
+*/
+  def outerApply(udtf: TableFunctionCall[_]): Table = {
+applyInternal(udtf, JoinType.LEFT_OUTER)
+  }
+
+  /**
+* The Outer Apply returns all the rows from the outer table (table on 
the left of the Apply
+* operator), and rows that do not matches the condition from the 
table-valued function (which
+* is on the right side of the operator), NULL values are displayed.
+*
+* The Outer Apply is equivalent to Left Outer Join, but it works with 
a table-valued function.
+*
+* Example:
+*
+* {{{
+*   val split = new MySplitUDTF()
+*   table.crossApply("split('c') as (s)").select("a, b, c, s")
+* }}}
+*/
+  def outerApply(udtf: String): Table = {
+applyInternal(udtf, JoinType.LEFT_OUTER)
+  }
+
+  private def applyInternal(udtfString: String, joinType: JoinType): Table 
= {
+val node = ExpressionParser.parseLogicalNode(udtfString)
+var alias: Option[Seq[Expression]] = None
+val functionCall = node match {
+  case AliasNode(aliasList, child) =>
+alias = Some(aliasList)
+child
+  case _ => node
+}
+
+functionCall match {
+  case call @ UnresolvedTableFunctionCall(name, args) =>
+val udtfCall = 
tableEnv.getFunctionCatalog.lookupTableFunction(name, args)
+if (alias.isDefined) {
+  applyInternal(udtfCall.as(alias.get: _*), joinType)
+} else {
+  applyInternal(udtfCall, joinType)
+}
+  case _ => throw new TableException("Cross/Outer Apply only accept 
TableFunction")
+}
+  }
+
+  private def applyInternal(node: LogicalNode, joinType: JoinType): Table 
= {
+node match {
+  case udtf: TableFunctionCall[_] =>
+udtf.setChild(this.logicalPlan)
+new Table(
+  tableEnv,
+  Join(this.logicalPlan, udtf.validate(tableEnv), joinType, None,
+   
Some(relBuilder.getCluster.createCorrel())).validate(tableEnv))
--- End diff --

Good idea!


---
If your project is set up for it, 

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-18 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88617047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.reflect.{Method, Type}
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.TableFunction
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
+
+class FlinkTableFunctionImpl[T](val typeInfo: TypeInformation[T],
+val fieldIndexes: Array[Int],
+val fieldNames: Array[String],
+val evalMethod: Method)
+  extends ReflectiveFunctionBase(evalMethod) with TableFunction {
+
+  if (fieldIndexes.length != fieldNames.length) {
+throw new TableException(
+  "Number of field indexes and field names must be equal.")
+  }
+
+  // check uniqueness of field names
+  if (fieldNames.length != fieldNames.toSet.size) {
+throw new TableException(
+  "Table field names must be unique.")
+  }
+
+  val fieldTypes: Array[TypeInformation[_]] =
+typeInfo match {
+  case cType: CompositeType[T] =>
+if (fieldNames.length != cType.getArity) {
+  throw new TableException(
+s"Arity of type (" + cType.getFieldNames.deep + ") " +
--- End diff --

No. `deep` is used to pretty print field names array such as `Array("a", 
"b", "c").deep.toString` will print "Array("a", "b", "c")" . 

I copied this from `FlinkTable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-18 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88615712
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+/**
+  * Flink RelNode which matches along with cross apply a user defined 
table function.
+  */
+class DataSetCorrelate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+scan: LogicalTableFunctionScan,
+condition: RexNode,
+relRowType: RelDataType,
+joinRowType: RelDataType,
+joinType: SemiJoinType,
+ruleDescription: String)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkCorrelate
+  with DataSetRel {
+  override def deriveRowType() = relRowType
+
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(getInput) + 10
--- End diff --

I learned this from `DataSetJoin` which plus left and right rowCount. You 
are right , `* 1.5` is more reasonable here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-18 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88614416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import java.lang.reflect.Method
+
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, 
TableException, UnresolvedException}
+import org.apache.flink.api.table.expressions.{Attribute, Expression, 
ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
+import org.apache.flink.api.table.validate.ValidationFailure
+
+import scala.collection.JavaConversions._
+
+/**
+  * General expression for unresolved user-defined table function calls.
+  */
+case class UnresolvedTableFunctionCall(functionName: String, args: 
Seq[Expression])
+  extends LogicalNode {
+
+  override def output: Seq[Attribute] =
+throw UnresolvedException("Invalid call to output on 
UnresolvedTableFunctionCall")
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder =
+throw UnresolvedException("Invalid call to construct on 
UnresolvedTableFunctionCall")
+
+  override private[flink] def children: Seq[LogicalNode] =
+throw UnresolvedException("Invalid call to children on 
UnresolvedTableFunctionCall")
+}
+
+/**
+  * LogicalNode for calling a user-defined table functions.
+  * @param tableFunction table function to be called (might be overloaded)
+  * @param parameters actual parameters
+  * @param alias output fields renaming
+  * @tparam T type of returned table
+  */
+case class TableFunctionCall[T: TypeInformation](
+  tableFunction: TableFunction[T],
+  parameters: Seq[Expression],
+  alias: Option[Array[String]]) extends UnaryNode {
+
+  private var table: LogicalNode = _
+  override def child: LogicalNode = table
+
+  def setChild(child: LogicalNode): TableFunctionCall[T] = {
+table = child
+this
+  }
+
+  private val resultType: TypeInformation[T] =
+if (tableFunction.getResultType == null) {
+  implicitly[TypeInformation[T]]
+} else {
+  tableFunction.getResultType
+}
+
+  private val fieldNames: Array[String] =
+if (alias.isEmpty) {
+  getFieldAttribute[T](resultType)._1
+} else {
+  alias.get
+}
+  private val fieldTypes: Array[TypeInformation[_]] = 
getFieldAttribute[T](resultType)._2
+
+  /**
+* Assigns an alias for this table function returned fields that the 
following `select()` clause
+* can refer to.
+*
+* @param aliasList alias for this window
+* @return this table function
+*/
+  def as(aliasList: Expression*): TableFunctionCall[T] = {
+if (aliasList == null) {
+  return this
+}
+if (aliasList.length != fieldNames.length) {
+  failValidation("Aliasing not match number of fields")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("Alias only accept name expressions as arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray
+  TableFunctionCall(tableFunction, parameters, Some(names))
+}
+  }
+
+  override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
+case (n, t) =>

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-18 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88614200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
+  s"automatically determined. Please provide type information 
manually.")
+  }
+}
+  }
+
+  /**
 * Returns the return type of the evaluation method matching the given 
signature.
 */
   def getResultTypeClass(
-  scalarFunction: ScalarFunction,
+  function: EvaluableFunction,
   signature: Array[Class[_]])
 : Class[_] = {
 // find method for signature
-val evalMethod = scalarFunction.getEvalMethods
+val evalMethod = function.getEvalMethods
   .find(m => signature.sameElements(m.getParameterTypes))
   .getOrElse(throw new IllegalArgumentException("Given signature is 
invalid."))
 evalMethod.getReturnType
   }
 
   /**
-* Prints all signatures of a [[ScalarFunction]].
+* Prints all signatures of a [[EvaluableFunction]].
 */
-  def signaturesToString(scalarFunction: ScalarFunction): String = {
-scalarFunction.getSignatures.map(signatureToString).mkString(", ")
+  def signaturesToString(function: EvaluableFunction): String = {
+function.getSignatures.map(signatureToString).mkString(", ")
   }
 
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo(inputType: TypeInformation[_])
+  : (Array[String], Array[Int]) = {
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[_] => t.getFieldNames
--- End diff --

Yes, you are right. We can use `CompositeType` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88594602
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
 ---
@@ -162,4 +165,24 @@ class BatchTableEnvironment(
 translate[T](table)(typeInfo)
   }
 
+  /**
+* Registers a [[TableFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param tf The TableFunction to register
+*/
+  def registerFunction[T](name: String, tf: TableFunction[T]): Unit ={
+val clazz: Type = tf.getClass.getGenericSuperclass
+val generic = clazz match {
+  case cls: ParameterizedType => cls.getActualTypeArguments.toSeq.head
--- End diff --

I will look into this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88594496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils
+
+import com.google.common.base.Predicate
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
+import org.apache.calcite.util.Util
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.FlinkTypeFactory
+
+import scala.collection.JavaConversions._
+import java.util
+
+import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
+
+/**
+  * Calcite wrapper for user-defined table functions.
+  */
+class TableSqlFunction(
+name: String,
+udtf: TableFunction[_],
+rowTypeInfo: TypeInformation[_],
+returnTypeInference: SqlReturnTypeInference,
+operandTypeInference: SqlOperandTypeInference,
+operandTypeChecker: SqlOperandTypeChecker,
+paramTypes: util.List[RelDataType],
+functionImpl: FlinkTableFunctionImpl[_])
+  extends SqlUserDefinedTableFunction(
+new SqlIdentifier(name, SqlParserPos.ZERO),
+returnTypeInference,
+operandTypeInference,
+operandTypeChecker,
+paramTypes,
+functionImpl) {
+
+  def getTableFunction = udtf
+
+  def getRowTypeInfo = rowTypeInfo
+
+  def getPojoFieldMapping = functionImpl.fieldIndexes
+
+}
+
+object TableSqlFunction {
+  /**
+*
+* @param name function name (used by SQL parser)
+* @param udtf user defined table function to be called
+* @param rowTypeInfo the row type information generated by the table 
function
+* @param typeFactory type factory for converting Flink's between 
Calcite's types
+* @param functionImpl calcite table function schema
+* @return
+*/
+  def apply(
+name: String,
+udtf: TableFunction[_],
+rowTypeInfo: TypeInformation[_],
+typeFactory: FlinkTypeFactory,
+functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
+
+val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
+val typeFamilies: util.List[SqlTypeFamily] = new 
util.ArrayList[SqlTypeFamily]
+for (o <- functionImpl.getParameters) {
+  val relType: RelDataType = o.getType(typeFactory)
+  argTypes.add(relType)
+  typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, 
SqlTypeFamily.ANY))
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88594489
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils
+
+import com.google.common.base.Predicate
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
+import org.apache.calcite.util.Util
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.FlinkTypeFactory
+
+import scala.collection.JavaConversions._
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88594410
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
 ---
@@ -164,4 +167,24 @@ class StreamTableEnvironment(
 translate[T](table)(typeInfo)
   }
 
+  /**
+* Registers a [[TableFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param tf The TableFunction to register
+*/
+  def registerFunction[T](name: String, tf: TableFunction[T]): Unit ={
--- End diff --

The `registerFunction[T: TypeInformation](name: String, tf: 
TableFunction[T])` only works in Scala. For Java, we need 
`registerFunction[T](name: String, tf: TableFunction[T])` and help users to 
extract the type. I don't expect that Java users can see `registerFunction[T: 
TypeInformation](name: String, tf: TableFunction[T])`.  So I separate this two 
function into Java/Scala TableEnvironment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88594421
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -542,11 +563,14 @@ class CodeGenerator(
   inputRef.getIndex - input1.getArity
 }
 
-generateInputAccess(input._1, input._2, index)
+generateInputAccess(input._1, input._2, index, input._3)
+  }
+
+  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): 
GeneratedExpression = {
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88593490
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -154,6 +154,10 @@ under the License.
maven-shade-plugin


+   package
+   
+   shade
--- End diff --

Yes. #2673 is blocked.  Without this, my IDEA Editor couldn't resolve the 
shade configuration, so I introduce it in this PR. Should we remove this ? 


![image](https://cloud.githubusercontent.com/assets/5378924/20417319/757a9128-ad7f-11e6-99d7-1dc603c425a8.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88488494
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
+
+import scala.collection.JavaConversions._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: RexNode,
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: 
Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+  returnType, rowType.getFieldNames)
+
+val input2NullExprs = input2AccessExprs.map(
+  x => GeneratedExpression("null", "true", "", x.resultType))
+val outerResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2NullExprs,
+  returnType, rowType.getFieldNames)
+
+val call = generator.generateExpression(rexCall)
+var body = call.code +
+   s"""
+  |scala.collection.Iterator iter = 
${call.resultTerm}.getRowsIterator();
+""".stripMargin
+if (joinType == SemiJoinType.INNER) {
+  // cross apply
+  body +=
+s"""
+   |if (iter.isEmpty()) {
+   |  return;
+   |}
+""".stripMargin
+} else {
+  // outer apply
+  body +=
+s"""
+   |if (iter.isEmpty()) {
+   |  ${outerResultExpr.code}
+   |  
${generator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+   |  return;
+   |}
+""".stripMargin
+}
+
+val projection = if (condition == null) {
+  s"""
+ |${crossResultExpr.code}
+ 
|${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
+   """.stripMargin
+} else {
+  val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo) 
{
+override def input1Term: String = input2Term
+  }
+  val filterCondition = filterGenerator.generateExpression(condition)
+  s"""
+ |${filterGenerator.reuseInputUnboxingCode()}
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ |  ${cross

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88486729
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
+
+import scala.collection.JavaConversions._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: RexNode,
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: 
Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+  returnType, rowType.getFieldNames)
+
+val input2NullExprs = input2AccessExprs.map(
+  x => GeneratedExpression("null", "true", "", x.resultType))
+val outerResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2NullExprs,
+  returnType, rowType.getFieldNames)
+
+val call = generator.generateExpression(rexCall)
+var body = call.code +
+   s"""
+  |scala.collection.Iterator iter = 
${call.resultTerm}.getRowsIterator();
--- End diff --

Until now we have pure Java generated code. Do we have to use Scala here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88492728
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 ---
@@ -47,13 +52,50 @@ class FunctionCatalog {
 sqlFunctions += sqlFunction
   }
 
+  /** Register multiple sql functions at one time. The functions has the 
same name. **/
+  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
+if (functions.nonEmpty) {
+  sqlFunctions --= sqlFunctions.filter(_.getName == 
functions.head.getName)
+  sqlFunctions ++= functions
+}
+  }
+
   def getSqlOperatorTable: SqlOperatorTable =
 ChainedSqlOperatorTable.of(
   new BasicOperatorTable(),
   new ListSqlOperatorTable(sqlFunctions)
 )
 
   /**
+* Lookup table function and create an TableFunctionCall if we find a 
match.
+*/
+  def lookupTableFunction[T](name: String, children: Seq[Expression]): 
TableFunctionCall[T] = {
--- End diff --

If `TableFunctionCall` would be an expression, we could merge this function 
with the function below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88458986
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -154,6 +154,10 @@ under the License.
maven-shade-plugin


+   package
+   
+   shade
--- End diff --

Isn't this change part of #2673?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88488089
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
+
+import scala.collection.JavaConversions._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: RexNode,
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: 
Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+  returnType, rowType.getFieldNames)
+
+val input2NullExprs = input2AccessExprs.map(
+  x => GeneratedExpression("null", "true", "", x.resultType))
--- End diff --

We use primitives where possible, so null might result in a compiler error 
for some types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88482480
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils
+
+import com.google.common.base.Predicate
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
+import org.apache.calcite.util.Util
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.FlinkTypeFactory
+
+import scala.collection.JavaConversions._
+import java.util
+
+import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
+
+/**
+  * Calcite wrapper for user-defined table functions.
+  */
+class TableSqlFunction(
+name: String,
+udtf: TableFunction[_],
+rowTypeInfo: TypeInformation[_],
+returnTypeInference: SqlReturnTypeInference,
+operandTypeInference: SqlOperandTypeInference,
+operandTypeChecker: SqlOperandTypeChecker,
+paramTypes: util.List[RelDataType],
+functionImpl: FlinkTableFunctionImpl[_])
+  extends SqlUserDefinedTableFunction(
+new SqlIdentifier(name, SqlParserPos.ZERO),
+returnTypeInference,
+operandTypeInference,
+operandTypeChecker,
+paramTypes,
+functionImpl) {
+
+  def getTableFunction = udtf
+
+  def getRowTypeInfo = rowTypeInfo
+
+  def getPojoFieldMapping = functionImpl.fieldIndexes
+
+}
+
+object TableSqlFunction {
+  /**
+*
+* @param name function name (used by SQL parser)
+* @param udtf user defined table function to be called
+* @param rowTypeInfo the row type information generated by the table 
function
+* @param typeFactory type factory for converting Flink's between 
Calcite's types
+* @param functionImpl calcite table function schema
+* @return
+*/
+  def apply(
+name: String,
+udtf: TableFunction[_],
+rowTypeInfo: TypeInformation[_],
+typeFactory: FlinkTypeFactory,
+functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
+
+val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
+val typeFamilies: util.List[SqlTypeFamily] = new 
util.ArrayList[SqlTypeFamily]
+for (o <- functionImpl.getParameters) {
+  val relType: RelDataType = o.getType(typeFactory)
+  argTypes.add(relType)
+  typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, 
SqlTypeFamily.ANY))
--- End diff --

Could you add some comments in this method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88480160
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
+
+  private val rows: ListBuffer[T] = new ListBuffer
+
+  /**
+* Emit an output row
+*
+* @param row the output row
+*/
+  protected def collect(row: T): Unit = {
+// cache rows for now, maybe immediately process them further
+rows += row
+  }
+
+
+  @Internal
+  def getRowsIterator = rows.toIterator
+
+  @Internal
+  def clear() = rows.clear()
+
+  // this method will not be called, because we need to register multiple 
sql function at one time
+  override private[flink] final def createSqlFunction(
+  name: String,
+  typeFactory: FlinkTypeFactory)
+: SqlFunction = {
+throw new UnsupportedOperationException("this method should not be 
called")
--- End diff --

I asked myself the same. The logic which is currently in `TableEnvironment` 
should actually be here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88491436
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -611,6 +612,130 @@ class Table(
   }
 
   /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
--- End diff --

from


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88477894
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 ---
@@ -451,6 +452,28 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 }
   }
 
+  lazy val tableFunctionCall: PackratParser[LogicalNode] =
+functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+case name ~ _ ~ args ~ _ => 
UnresolvedTableFunctionCall(name.toUpperCase, args)
--- End diff --

Couldn't `UnresolvedTableFunctionCall` be an expression? It is not very 
nice that we mix up expressions and logical nodes here. We also should think 
about how we want to integrate IN/NOT IN in the future as we have similar 
problems there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88491682
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -611,6 +612,130 @@ class Table(
   }
 
   /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply(split('c).as('s)).select('a,'b,'c,'s)
+* }}}
+*/
+  def crossApply(udtf: TableFunctionCall[_]): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply("split('c') as (s)").select("a, b, c, s")
+* }}}
+*/
+  def crossApply(udtf: String): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
--- End diff --

form=from


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88491270
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.reflect.{Method, Type}
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.TableFunction
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
+
+class FlinkTableFunctionImpl[T](val typeInfo: TypeInformation[T],
+val fieldIndexes: Array[Int],
+val fieldNames: Array[String],
+val evalMethod: Method)
+  extends ReflectiveFunctionBase(evalMethod) with TableFunction {
+
+  if (fieldIndexes.length != fieldNames.length) {
+throw new TableException(
+  "Number of field indexes and field names must be equal.")
+  }
+
+  // check uniqueness of field names
+  if (fieldNames.length != fieldNames.toSet.size) {
+throw new TableException(
+  "Table field names must be unique.")
+  }
+
+  val fieldTypes: Array[TypeInformation[_]] =
+typeInfo match {
+  case cType: CompositeType[T] =>
+if (fieldNames.length != cType.getArity) {
+  throw new TableException(
+s"Arity of type (" + cType.getFieldNames.deep + ") " +
--- End diff --

deep=length?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88472140
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -542,11 +563,14 @@ class CodeGenerator(
   inputRef.getIndex - input1.getArity
 }
 
-generateInputAccess(input._1, input._2, index)
+generateInputAccess(input._1, input._2, index, input._3)
+  }
+
+  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): 
GeneratedExpression = {
--- End diff --

Could you maybe also check with #2319? I also changed this method. We might 
have a large merge conflict here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88483859
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
+  s"automatically determined. Please provide type information 
manually.")
+  }
+}
+  }
+
+  /**
 * Returns the return type of the evaluation method matching the given 
signature.
 */
   def getResultTypeClass(
-  scalarFunction: ScalarFunction,
+  function: EvaluableFunction,
   signature: Array[Class[_]])
 : Class[_] = {
 // find method for signature
-val evalMethod = scalarFunction.getEvalMethods
+val evalMethod = function.getEvalMethods
   .find(m => signature.sameElements(m.getParameterTypes))
   .getOrElse(throw new IllegalArgumentException("Given signature is 
invalid."))
 evalMethod.getReturnType
   }
 
   /**
-* Prints all signatures of a [[ScalarFunction]].
+* Prints all signatures of a [[EvaluableFunction]].
 */
-  def signaturesToString(scalarFunction: ScalarFunction): String = {
-scalarFunction.getSignatures.map(signatureToString).mkString(", ")
+  def signaturesToString(function: EvaluableFunction): String = {
+function.getSignatures.map(signatureToString).mkString(", ")
   }
 
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo(inputType: TypeInformation[_])
+  : (Array[String], Array[Int]) = {
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[_] => t.getFieldNames
--- End diff --

Why don't you use `CompositeType` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88470935
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -271,18 +275,33 @@ class CodeGenerator(
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
 val input1AccessExprs = for (i <- 0 until input1.getArity)
-  yield generateInputAccess(input1, input1Term, i)
+  yield generateInputAccess(input1, input1Term, i, 
input1PojoFieldMapping)
 
 val input2AccessExprs = input2 match {
   case Some(ti) => for (i <- 0 until ti.getArity)
-yield generateInputAccess(ti, input2Term, i)
+yield generateInputAccess(ti, input2Term, i, 
input2PojoFieldMapping)
   case None => Seq() // add nothing
 }
 
 generateResultExpression(input1AccessExprs ++ input2AccessExprs, 
returnType, resultFieldNames)
   }
 
   /**
+* Generates an expression from the left input and the right table 
function.
+*/
+  def generateCorrelateAccessExprs: (Seq[GeneratedExpression], 
Seq[GeneratedExpression]) = {
+val input1AccessExprs = for (i <- 0 until input1.getArity)
+  yield generateInputAccess(input1, input1Term, i, 
input1PojoFieldMapping)
+
+val input2AccessExprs = input2 match {
+  case Some(ti) => for (i <- 0 until ti.getArity)
+yield generateFieldAccess(ti, input2Term, i, 
input2PojoFieldMapping)
+  case None => throw new TableException("type information of input2 
must not be null")
--- End diff --

This should be a `CodeGenException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88463244
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
 ---
@@ -164,4 +167,24 @@ class StreamTableEnvironment(
 translate[T](table)(typeInfo)
   }
 
+  /**
+* Registers a [[TableFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param tf The TableFunction to register
+*/
+  def registerFunction[T](name: String, tf: TableFunction[T]): Unit ={
--- End diff --

Is there a reason why this method is not declared in `TableEnvironment`? As 
far as I can see this is duplicate code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88478530
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
 ---
@@ -48,7 +48,7 @@ import org.apache.flink.api.table.{FlinkTypeFactory, 
ValidationException}
   * recommended to declare parameters and result types as primitive types 
instead of their boxed
   * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
   */
-abstract class ScalarFunction extends UserDefinedFunction {
+abstract class ScalarFunction extends UserDefinedFunction with 
EvaluableFunction {
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88483086
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
--- End diff --

If this is used for both methods, than `table function` is wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88484285
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
+  s"automatically determined. Please provide type information 
manually.")
+  }
+}
+  }
+
+  /**
 * Returns the return type of the evaluation method matching the given 
signature.
 */
   def getResultTypeClass(
-  scalarFunction: ScalarFunction,
+  function: EvaluableFunction,
   signature: Array[Class[_]])
 : Class[_] = {
 // find method for signature
-val evalMethod = scalarFunction.getEvalMethods
+val evalMethod = function.getEvalMethods
   .find(m => signature.sameElements(m.getParameterTypes))
   .getOrElse(throw new IllegalArgumentException("Given signature is 
invalid."))
 evalMethod.getReturnType
   }
 
   /**
-* Prints all signatures of a [[ScalarFunction]].
+* Prints all signatures of a [[EvaluableFunction]].
 */
-  def signaturesToString(scalarFunction: ScalarFunction): String = {
-scalarFunction.getSignatures.map(signatureToString).mkString(", ")
+  def signaturesToString(function: EvaluableFunction): String = {
+function.getSignatures.map(signatureToString).mkString(", ")
   }
 
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo(inputType: TypeInformation[_])
+  : (Array[String], Array[Int]) = {
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[_] => t.getFieldNames
+  case c: CaseClassTypeInfo[_] => c.getFieldNames
+  case p: PojoTypeInfo[_] => p.getFieldNames
+  case a: AtomicType[_] => Array("f0")
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+(fieldNames, fieldIndexes)
+  }
+
+  /**
+* Returns field names and field types for a given [[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
types from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field types.
+*/
+  def getFieldAttribute[A](inputType: TypeInformation[A])
--- End diff --

Maybe merge this method and the method above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88465942
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
--- End diff --

Yes, you are right this seems to be a bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88488325
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
+
+import scala.collection.JavaConversions._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: RexNode,
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: 
Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+  returnType, rowType.getFieldNames)
+
+val input2NullExprs = input2AccessExprs.map(
+  x => GeneratedExpression("null", "true", "", x.resultType))
+val outerResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2NullExprs,
+  returnType, rowType.getFieldNames)
+
+val call = generator.generateExpression(rexCall)
+var body = call.code +
--- End diff --

I would put this into the multiline string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88463780
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
 ---
@@ -142,4 +143,15 @@ class StreamTableEnvironment(
 asScalaStream(translate(table))
   }
 
+  /**
+* Registers a [[TableFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param tf The TableFunction to register
+*/
+  def registerFunction[T: TypeInformation](name: String, tf: 
TableFunction[T]): Unit ={
--- End diff --

`= {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88469480
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
+  }
+
+  /**
+* Registers a [[TableFunction]] under a unique name. Replaces already 
existing
+* user-defined functions under this name.
+*/
+  private[flink] def registerTableFunctionInternal[T: TypeInformation](
+name: String, tf: TableFunction[T]): Unit = {
 
-  case _ =>
-throw new TableException("Unsupported user-defined function type.")
+val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
+  tf.getResultType
--- End diff --

Even though a user can define an own result type, the following 
`getFieldInfo` limits the types that can be used. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88481113
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils
+
+import com.google.common.base.Predicate
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
+import org.apache.calcite.util.Util
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.FlinkTypeFactory
+
+import scala.collection.JavaConversions._
--- End diff --

We try to use `JavaConverters` only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88485735
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import java.lang.reflect.Method
+
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, 
TableException, UnresolvedException}
+import org.apache.flink.api.table.expressions.{Attribute, Expression, 
ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
+import org.apache.flink.api.table.validate.ValidationFailure
+
+import scala.collection.JavaConversions._
+
+/**
+  * General expression for unresolved user-defined table function calls.
--- End diff --

This is not an expression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88493178
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 ---
@@ -47,13 +52,50 @@ class FunctionCatalog {
 sqlFunctions += sqlFunction
   }
 
+  /** Register multiple sql functions at one time. The functions has the 
same name. **/
+  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
+if (functions.nonEmpty) {
+  sqlFunctions --= sqlFunctions.filter(_.getName == 
functions.head.getName)
+  sqlFunctions ++= functions
+}
+  }
+
   def getSqlOperatorTable: SqlOperatorTable =
 ChainedSqlOperatorTable.of(
   new BasicOperatorTable(),
   new ListSqlOperatorTable(sqlFunctions)
 )
 
   /**
+* Lookup table function and create an TableFunctionCall if we find a 
match.
+*/
+  def lookupTableFunction[T](name: String, children: Seq[Expression]): 
TableFunctionCall[T] = {
+val funcClass = functionBuilders
+  .getOrElse(name.toLowerCase, throw ValidationException(s"Undefined 
function: $name"))
+funcClass match {
+  // user-defined table function call
+  case tf if classOf[TableFunction[T]].isAssignableFrom(tf) =>
+
Try(UserDefinedFunctionUtils.instantiate(tf.asInstanceOf[Class[TableFunction[T]]]))
 match {
+  case Success(tableFunction) => {
+val clazz: Type = tableFunction.getClass.getGenericSuperclass
--- End diff --

As I said earlier this and following lines are very error-prone. We should 
keep calls to the TypeExtractor to a very minimum. The TypeExtractor is only 
intended for Java not Scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88406551
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/UserDefinedTableFunctionITCase.java
 ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.functions.TableFunction;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+
+@RunWith(Parameterized.class)
+public class UserDefinedTableFunctionITCase extends TableProgramsTestBase {
+
+   public UserDefinedTableFunctionITCase(TestExecutionMode mode, 
TableConfigMode configMode){
+   super(mode, configMode);
+   }
+
+
+   @Test
+   public void testUDTF() throws Exception {
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88406527
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
--- End diff --

You are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88342301
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
+import org.apache.calcite.rex.RexNode
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, 
DataStreamConvention}
+
+/**
+  * parser cross/outer apply
+  */
+class DataStreamCorrelateRule
+  extends ConverterRule(
+classOf[LogicalCorrelate],
+Convention.NONE,
+DataStreamConvention.INSTANCE,
+"DataStreamCorrelateRule")
+{
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+right match {
+  // right node is a table function
+  case scan: LogicalTableFunctionScan => true
+  // a filter is pushed above the table function
+  case filter: LogicalFilter =>
+filter.getInput.asInstanceOf[RelSubset].getOriginal
+  .isInstanceOf[LogicalTableFunctionScan]
+  case _ => false
+}
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(join.getInput(0), 
DataStreamConvention.INSTANCE)
+val right: RelNode = join.getInput(1)
+
+def convertToCorrelate(relNode: RelNode, condition: RexNode): 
DataStreamCorrelate = {
--- End diff --

define `condition` as `Option[RexNode]` so we do not have to use `null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88339990
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
+
+import scala.collection.JavaConversions._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: RexNode,
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: 
Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+  returnType, rowType.getFieldNames)
+
+val input2NullExprs = input2AccessExprs.map(
+  x => GeneratedExpression("null", "true", "", x.resultType))
+val outerResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2NullExprs,
+  returnType, rowType.getFieldNames)
+
+val call = generator.generateExpression(rexCall)
+var body = call.code +
+   s"""
+  |scala.collection.Iterator iter = 
${call.resultTerm}.getRowsIterator();
+""".stripMargin
+if (joinType == SemiJoinType.INNER) {
+  // cross apply
+  body +=
+s"""
+   |if (iter.isEmpty()) {
+   |  return;
+   |}
+""".stripMargin
+} else {
--- End diff --

I think it would be safer to add an `else if (joinType == 
SemiJoinType.LEFT)` here and throw an exception in `else`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88348205
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+
+/**
+  * parser cross/outer apply
--- End diff --

Replace by "Rule to convert a LogicalCorrelate into a DataSetCorrelate.".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88337239
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import java.lang.reflect.Method
+
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, 
TableException, UnresolvedException}
+import org.apache.flink.api.table.expressions.{Attribute, Expression, 
ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
+import org.apache.flink.api.table.validate.ValidationFailure
+
+import scala.collection.JavaConversions._
+
+/**
+  * General expression for unresolved user-defined table function calls.
+  */
+case class UnresolvedTableFunctionCall(functionName: String, args: 
Seq[Expression])
+  extends LogicalNode {
+
+  override def output: Seq[Attribute] =
+throw UnresolvedException("Invalid call to output on 
UnresolvedTableFunctionCall")
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder =
+throw UnresolvedException("Invalid call to construct on 
UnresolvedTableFunctionCall")
+
+  override private[flink] def children: Seq[LogicalNode] =
+throw UnresolvedException("Invalid call to children on 
UnresolvedTableFunctionCall")
+}
+
+/**
+  * LogicalNode for calling a user-defined table functions.
+  * @param tableFunction table function to be called (might be overloaded)
+  * @param parameters actual parameters
+  * @param alias output fields renaming
+  * @tparam T type of returned table
+  */
+case class TableFunctionCall[T: TypeInformation](
+  tableFunction: TableFunction[T],
+  parameters: Seq[Expression],
+  alias: Option[Array[String]]) extends UnaryNode {
+
+  private var table: LogicalNode = _
+  override def child: LogicalNode = table
+
+  def setChild(child: LogicalNode): TableFunctionCall[T] = {
+table = child
+this
+  }
+
+  private val resultType: TypeInformation[T] =
+if (tableFunction.getResultType == null) {
+  implicitly[TypeInformation[T]]
+} else {
+  tableFunction.getResultType
+}
+
+  private val fieldNames: Array[String] =
+if (alias.isEmpty) {
+  getFieldAttribute[T](resultType)._1
+} else {
+  alias.get
+}
+  private val fieldTypes: Array[TypeInformation[_]] = 
getFieldAttribute[T](resultType)._2
+
+  /**
+* Assigns an alias for this table function returned fields that the 
following `select()` clause
+* can refer to.
+*
+* @param aliasList alias for this window
+* @return this table function
+*/
+  def as(aliasList: Expression*): TableFunctionCall[T] = {
+if (aliasList == null) {
+  return this
+}
+if (aliasList.length != fieldNames.length) {
+  failValidation("Aliasing not match number of fields")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("Alias only accept name expressions as arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray
+  TableFunctionCall(tableFunction, parameters, Some(names))
+}
+  }
+
+  override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
+case (n, t) =>

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88339350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
+
+import scala.collection.JavaConversions._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(generator: CodeGenerator,
--- End diff --

can you change the constructor parameter wrapping and indention to be 
similar to `DataSetCorrelate`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88336779
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
+  s"automatically determined. Please provide type information 
manually.")
+  }
+}
+  }
+
+  /**
 * Returns the return type of the evaluation method matching the given 
signature.
 */
   def getResultTypeClass(
-  scalarFunction: ScalarFunction,
+  function: EvaluableFunction,
--- End diff --

I think `UserDefinedFunction` would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88338123
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -611,6 +612,130 @@ class Table(
   }
 
   /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply(split('c).as('s)).select('a,'b,'c,'s)
+* }}}
+*/
+  def crossApply(udtf: TableFunctionCall[_]): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply("split('c') as (s)").select("a, b, c, s")
+* }}}
+*/
+  def crossApply(udtf: String): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
--- End diff --

`Cross Apply` should be `Outer Apply`. Please check the complete docs for 
this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88347146
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/EvaluableFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.functions
+
+import java.lang.reflect.{Modifier, Method}
+import org.apache.flink.api.table.ValidationException
+
+/**
+  * User-defined function has eval methods can extend this trait to reuse 
the same logic, such as:
+  * [[ScalarFunction]] and [[TableFunction]].
+  */
+trait EvaluableFunction {
+
+  private lazy val evalMethods = checkAndExtractEvalMethods()
+  private lazy val signatures = evalMethods.map(_.getParameterTypes)
+
+  /**
+* Extracts evaluation methods and throws a [[ValidationException]] if 
no implementation
+* can be found.
+*/
+  private def checkAndExtractEvalMethods(): Array[Method] = {
+val methods = getClass
+  .getDeclaredMethods
+  .filter { m =>
+val modifiers = m.getModifiers
+m.getName == "eval" && Modifier.isPublic(modifiers) && 
!Modifier.isAbstract(modifiers)
+  }
+
+if (methods.isEmpty) {
+  throw new ValidationException(s"Table function class '$this' does 
not implement at least " +
--- End diff --

If this method is also used for `ScalarFunction`, the exception message 
should be adapted accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88345307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+/**
+  * Flink RelNode which matches along with cross apply a user defined 
table function.
+  */
+class DataSetCorrelate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+scan: LogicalTableFunctionScan,
+condition: RexNode,
+relRowType: RelDataType,
+joinRowType: RelDataType,
+joinType: SemiJoinType,
+ruleDescription: String)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkCorrelate
+  with DataSetRel {
+  override def deriveRowType() = relRowType
+
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(getInput) + 10
+planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetCorrelate(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  scan,
+  condition,
+  relRowType,
+  joinRowType,
+  joinType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+val rexCall = scan.getCall.asInstanceOf[RexCall]
+val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+correlateToString(rexCall, sqlFunction)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val rexCall = scan.getCall.asInstanceOf[RexCall]
+val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+super.explainTerms(pw)
+  .item("lateral", correlateToString(rexCall, sqlFunction))
+  .item("select", selectToString(relRowType))
+  }
+
+
+  override def translateToPlan(tableEnv: BatchTableEnvironment,
+   expectedType: 
Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val config = tableEnv.getConfig
+val returnType = determineReturnType(
+  getRowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val inputDS = inputNode.asInstanceOf[DataSetRel]
+  .translateToPlan(tableEnv, Some(inputRowType(inputNode)))
--- End diff --

I think we can replace `Some(inputRowType(inputNode))` by 
`Some(TypeConverter.DEFAULT_ROW_TYPE)` (similar as in 
`DataSetAggregate.translateToPlan()`

Then we can also remove the method `FlinkCorrelate.inputRowType`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, pl

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88348343
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.reflect.{Method, Type}
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.TableFunction
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
+
+class FlinkTableFunctionImpl[T](val typeInfo: TypeInformation[T],
--- End diff --

Please add a brief description of the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88347669
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
+
+  private val rows: ListBuffer[T] = new ListBuffer
+
+  /**
+* Emit an output row
+*
+* @param row the output row
+*/
+  protected def collect(row: T): Unit = {
+// cache rows for now, maybe immediately process them further
+rows += row
+  }
+
+
+  @Internal
+  def getRowsIterator = rows.toIterator
+
+  @Internal
+  def clear() = rows.clear()
+
+  // this method will not be called, because we need to register multiple 
sql function at one time
+  override private[flink] final def createSqlFunction(
+  name: String,
+  typeFactory: FlinkTypeFactory)
+: SqlFunction = {
+throw new UnsupportedOperationException("this method should not be 
called")
--- End diff --

Why is this method not necessary for `TableFunction`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88337706
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import java.lang.reflect.Method
+
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, 
TableException, UnresolvedException}
+import org.apache.flink.api.table.expressions.{Attribute, Expression, 
ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.functions.TableFunction
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
+import org.apache.flink.api.table.validate.ValidationFailure
+
+import scala.collection.JavaConversions._
+
+/**
+  * General expression for unresolved user-defined table function calls.
+  */
+case class UnresolvedTableFunctionCall(functionName: String, args: 
Seq[Expression])
+  extends LogicalNode {
+
+  override def output: Seq[Attribute] =
+throw UnresolvedException("Invalid call to output on 
UnresolvedTableFunctionCall")
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder =
+throw UnresolvedException("Invalid call to construct on 
UnresolvedTableFunctionCall")
+
+  override private[flink] def children: Seq[LogicalNode] =
+throw UnresolvedException("Invalid call to children on 
UnresolvedTableFunctionCall")
+}
+
+/**
+  * LogicalNode for calling a user-defined table functions.
+  * @param tableFunction table function to be called (might be overloaded)
+  * @param parameters actual parameters
+  * @param alias output fields renaming
+  * @tparam T type of returned table
+  */
+case class TableFunctionCall[T: TypeInformation](
+  tableFunction: TableFunction[T],
+  parameters: Seq[Expression],
+  alias: Option[Array[String]]) extends UnaryNode {
+
+  private var table: LogicalNode = _
+  override def child: LogicalNode = table
+
+  def setChild(child: LogicalNode): TableFunctionCall[T] = {
+table = child
+this
+  }
+
+  private val resultType: TypeInformation[T] =
+if (tableFunction.getResultType == null) {
+  implicitly[TypeInformation[T]]
+} else {
+  tableFunction.getResultType
+}
+
+  private val fieldNames: Array[String] =
+if (alias.isEmpty) {
+  getFieldAttribute[T](resultType)._1
+} else {
+  alias.get
+}
+  private val fieldTypes: Array[TypeInformation[_]] = 
getFieldAttribute[T](resultType)._2
+
+  /**
+* Assigns an alias for this table function returned fields that the 
following `select()` clause
+* can refer to.
+*
+* @param aliasList alias for this window
+* @return this table function
+*/
+  def as(aliasList: Expression*): TableFunctionCall[T] = {
+if (aliasList == null) {
+  return this
+}
+if (aliasList.length != fieldNames.length) {
+  failValidation("Aliasing not match number of fields")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("Alias only accept name expressions as arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray
+  TableFunctionCall(tableFunction, parameters, Some(names))
+}
+  }
+
+  override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
+case (n, t) =>

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88343373
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+/**
+  * Flink RelNode which matches along with cross apply a user defined 
table function.
+  */
+class DataSetCorrelate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+scan: LogicalTableFunctionScan,
+condition: RexNode,
--- End diff --

use `Option[RexNode]` for `condition`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88345505
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.reflect.{Method, Type}
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.TableFunction
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
+
+class FlinkTableFunctionImpl[T](val typeInfo: TypeInformation[T],
--- End diff --

Please indent parameters as in `DataSetCorrelate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88335427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
--- End diff --

change to `"... works on zero, one, or multiple scalar values as input 
..."`. It does not need to be a complete row, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88343933
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+/**
+  * Flink RelNode which matches along with cross apply a user defined 
table function.
+  */
+class DataSetCorrelate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+scan: LogicalTableFunctionScan,
+condition: RexNode,
+relRowType: RelDataType,
+joinRowType: RelDataType,
+joinType: SemiJoinType,
+ruleDescription: String)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkCorrelate
+  with DataSetRel {
+  override def deriveRowType() = relRowType
+
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(getInput) + 10
--- End diff --

Not that is would actually matter at the moment, but why are you adding a 
constant here. Shouldn't it be something like `* 1.5` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88346931
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/EvaluableFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
--- End diff --

+1 I think it's a good idea to move these methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88346802
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 ---
@@ -47,13 +52,50 @@ class FunctionCatalog {
 sqlFunctions += sqlFunction
   }
 
+  /** Register multiple sql functions at one time. The functions has the 
same name. **/
+  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
+if (functions.nonEmpty) {
+  sqlFunctions --= sqlFunctions.filter(_.getName == 
functions.head.getName)
+  sqlFunctions ++= functions
+}
+  }
+
   def getSqlOperatorTable: SqlOperatorTable =
 ChainedSqlOperatorTable.of(
   new BasicOperatorTable(),
   new ListSqlOperatorTable(sqlFunctions)
 )
 
   /**
+* Lookup table function and create an TableFunctionCall if we find a 
match.
+*/
+  def lookupTableFunction[T](name: String, children: Seq[Expression]): 
TableFunctionCall[T] = {
+val funcClass = functionBuilders
+  .getOrElse(name.toLowerCase, throw ValidationException(s"Undefined 
function: $name"))
+funcClass match {
+  // user-defined table function call
+  case tf if classOf[TableFunction[T]].isAssignableFrom(tf) =>
+
Try(UserDefinedFunctionUtils.instantiate(tf.asInstanceOf[Class[TableFunction[T]]]))
 match {
+  case Success(tableFunction) => {
+val clazz: Type = tableFunction.getClass.getGenericSuperclass
+val generic = clazz match {
+  case cls: ParameterizedType => 
cls.getActualTypeArguments.toSeq.head
+  case _ => throw new TableException(
+"New TableFunction classes need to inherit from 
TableFunction class," +
+  " and statement the generic type.")
+}
+implicit val typeInfo: TypeInformation[T] = 
TypeExtractor.createTypeInfo(generic)
+  .asInstanceOf[TypeInformation[T]]
+TableFunctionCall(tableFunction, children, None)
+  }
+  case Failure(e) => throw ValidationException(e.getMessage)
+}
+  case _ =>
+throw ValidationException("Unsupported table function.")
--- End diff --

I think this exception message could be improved. It is throw if the 
registered method does not implement the `TableFunction` interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88344259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * Flink RelNode which matches along with cross apply a user defined 
table function.
+  */
+class DataStreamCorrelate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+scan: LogicalTableFunctionScan,
+condition: RexNode,
+relRowType: RelDataType,
+joinRowType: RelDataType,
+joinType: SemiJoinType,
+ruleDescription: String)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkCorrelate
+  with DataStreamRel {
+  override def deriveRowType() = relRowType
+
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(getInput) + 10
+planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamCorrelate(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  scan,
+  condition,
+  relRowType,
+  joinRowType,
+  joinType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+val funcRel = unwrap(scan)
+val rexCall = funcRel.getCall.asInstanceOf[RexCall]
+val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+correlateToString(rexCall, sqlFunction)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val funcRel = unwrap(scan)
+val rexCall = funcRel.getCall.asInstanceOf[RexCall]
+val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+super.explainTerms(pw)
+  .item("lateral", correlateToString(rexCall, sqlFunction))
+  .item("select", selectToString(relRowType))
+  }
+
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment,
+   expectedType: 
Option[TypeInformation[Any]]): DataStream[Any] = {
--- End diff --

please indent similar as the constructor parameters of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88336518
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
--- End diff --

would't `Class[_]` be more generic than necessary. `UserDefinedFunction` 
would work as well, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88342273
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+
+/**
+  * parser cross/outer apply
+  */
+class DataSetCorrelateRule
+  extends ConverterRule(
+  classOf[LogicalCorrelate],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetCorrelateRule")
+  {
+
+override def matches(call: RelOptRuleCall): Boolean = {
+  val join: LogicalCorrelate = 
call.rel(0).asInstanceOf[LogicalCorrelate]
+  val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+
+  right match {
+// right node is a table function
+case scan: LogicalTableFunctionScan => true
+// a filter is pushed above the table function
+case filter: LogicalFilter =>
+  filter.getInput.asInstanceOf[RelSubset].getOriginal
+.isInstanceOf[LogicalTableFunctionScan]
+case _ => false
+  }
+}
+
+override def convert(rel: RelNode): RelNode = {
+  val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
+  val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+  val convInput: RelNode = RelOptRule.convert(join.getInput(0), 
DataSetConvention.INSTANCE)
+  val right: RelNode = join.getInput(1)
+
+  def convertToCorrelate(relNode: RelNode, condition: RexNode): 
DataSetCorrelate = {
--- End diff --

define `condition` as `Option[RexNode]` so we do not have to use `null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88345712
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 ---
@@ -47,13 +52,50 @@ class FunctionCatalog {
 sqlFunctions += sqlFunction
   }
 
+  /** Register multiple sql functions at one time. The functions has the 
same name. **/
+  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
--- End diff --

Please check that all functions have the same name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88345175
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * Flink RelNode which matches along with cross apply a user defined 
table function.
+  */
+class DataStreamCorrelate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+scan: LogicalTableFunctionScan,
+condition: RexNode,
+relRowType: RelDataType,
+joinRowType: RelDataType,
+joinType: SemiJoinType,
+ruleDescription: String)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkCorrelate
+  with DataStreamRel {
+  override def deriveRowType() = relRowType
+
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(getInput) + 10
+planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamCorrelate(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  scan,
+  condition,
+  relRowType,
+  joinRowType,
+  joinType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+val funcRel = unwrap(scan)
+val rexCall = funcRel.getCall.asInstanceOf[RexCall]
+val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+correlateToString(rexCall, sqlFunction)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val funcRel = unwrap(scan)
+val rexCall = funcRel.getCall.asInstanceOf[RexCall]
+val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+super.explainTerms(pw)
+  .item("lateral", correlateToString(rexCall, sqlFunction))
+  .item("select", selectToString(relRowType))
+  }
+
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment,
+   expectedType: 
Option[TypeInformation[Any]]): DataStream[Any] = {
+
+val config = tableEnv.getConfig
+val returnType = determineReturnType(
+  getRowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val inputDS = inputNode.asInstanceOf[DataStreamRel]
+  .translateToPlan(tableEnv, Some(inputRowType(inputNode)))
--- End diff --

I think we can replace `Some(inputRowType(inputNode))` by 
`Some(TypeConverter.DEFAULT_ROW_TYPE)` (similar as in 
`DataSetAggregate.translateToPlan()`

Then we can also remove the method `FlinkCorrelate.inputRowType`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub 

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88325623
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/UserDefinedTableFunctionITCase.java
 ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.functions.TableFunction;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+
+@RunWith(Parameterized.class)
+public class UserDefinedTableFunctionITCase extends TableProgramsTestBase {
+
+   public UserDefinedTableFunctionITCase(TestExecutionMode mode, 
TableConfigMode configMode){
+   super(mode, configMode);
+   }
+
+
+   @Test
+   public void testUDTF() throws Exception {
--- End diff --

rename to `testUDTFWithCrossApply`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88343322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * Flink RelNode which matches along with cross apply a user defined 
table function.
+  */
+class DataStreamCorrelate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+scan: LogicalTableFunctionScan,
+condition: RexNode,
--- End diff --

use `Option[RexNode]` for `condition`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88337810
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -17,20 +17,30 @@
  */
 package org.apache.flink.api.table.plan.logical
 
+import java.lang.reflect.Method
+
+import com.google.common.collect.Sets
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.rel.core.CorrelationId
--- End diff --

Most of the added imports are unused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88348227
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
+import org.apache.calcite.rex.RexNode
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, 
DataStreamConvention}
+
+/**
+  * parser cross/outer apply
--- End diff --

Replace by "Rule to convert a LogicalCorrelate into a DataStreamCorrelate.".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88339010
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -611,6 +612,130 @@ class Table(
   }
 
   /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply(split('c).as('s)).select('a,'b,'c,'s)
+* }}}
+*/
+  def crossApply(udtf: TableFunctionCall[_]): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply("split('c') as (s)").select("a, b, c, s")
+* }}}
+*/
+  def crossApply(udtf: String): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.outerApply(split('c).as('s)).select('a,'b,'c,'s)
+* }}}
+*/
+  def outerApply(udtf: TableFunctionCall[_]): Table = {
+applyInternal(udtf, JoinType.LEFT_OUTER)
+  }
+
+  /**
+* The Outer Apply returns all the rows from the outer table (table on 
the left of the Apply
+* operator), and rows that do not matches the condition from the 
table-valued function (which
+* is on the right side of the operator), NULL values are displayed.
+*
+* The Outer Apply is equivalent to Left Outer Join, but it works with 
a table-valued function.
+*
+* Example:
+*
+* {{{
+*   val split = new MySplitUDTF()
+*   table.crossApply("split('c') as (s)").select("a, b, c, s")
+* }}}
+*/
+  def outerApply(udtf: String): Table = {
+applyInternal(udtf, JoinType.LEFT_OUTER)
+  }
+
+  private def applyInternal(udtfString: String, joinType: JoinType): Table 
= {
+val node = ExpressionParser.parseLogicalNode(udtfString)
+var alias: Option[Seq[Expression]] = None
+val functionCall = node match {
+  case AliasNode(aliasList, child) =>
+alias = Some(aliasList)
+child
+  case _ => node
+}
+
+functionCall match {
+  case call @ UnresolvedTableFunctionCall(name, args) =>
+val udtfCall = 
tableEnv.getFunctionCatalog.lookupTableFunction(name, args)
+if (alias.isDefined) {
+  applyInternal(udtfCall.as(alias.get: _*), joinType)
+} else {
+  applyInternal(udtfCall, joinType)
+}
+  case _ => throw new TableException("Cross/Outer Apply only accept 
TableFunction")
+}
+  }
+
+  private def applyInternal(node: LogicalNode, joinType: JoinType): Table 
= {
+node match {
+  case udtf: TableFunctionCall[_] =>
+udtf.setChild(this.logicalPlan)
+new Table(
+  tableEnv,
+  Join(this.logicalPlan, udtf.validate(tableEnv), joinType, None,
+   
Some(relBuilder.getCluster.createCorrel())).validate(tableEnv))
--- End diff --

We kept Calcite code so far out of our `LogicalNode

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88344133
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+/**
+  * Flink RelNode which matches along with cross apply a user defined 
table function.
+  */
+class DataSetCorrelate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+scan: LogicalTableFunctionScan,
+condition: RexNode,
+relRowType: RelDataType,
+joinRowType: RelDataType,
+joinType: SemiJoinType,
+ruleDescription: String)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkCorrelate
+  with DataSetRel {
+  override def deriveRowType() = relRowType
+
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(getInput) + 10
+planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetCorrelate(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  scan,
+  condition,
+  relRowType,
+  joinRowType,
+  joinType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+val rexCall = scan.getCall.asInstanceOf[RexCall]
+val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+correlateToString(rexCall, sqlFunction)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+val rexCall = scan.getCall.asInstanceOf[RexCall]
+val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+super.explainTerms(pw)
+  .item("lateral", correlateToString(rexCall, sqlFunction))
+  .item("select", selectToString(relRowType))
+  }
+
+
+  override def translateToPlan(tableEnv: BatchTableEnvironment,
+   expectedType: 
Option[TypeInformation[Any]]): DataSet[Any] = {
--- End diff --

please indent similar as the constructor parameters of this class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88339652
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
+
+import scala.collection.JavaConversions._
+
+/**
+  * cross/outer apply a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(generator: CodeGenerator,
+  udtfTypeInfo: TypeInformation[Any],
+  rowType: RelDataType,
+  rexCall: RexCall,
+  condition: RexNode,
+  config: TableConfig,
+  joinType: SemiJoinType,
+  expectedType: 
Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+  returnType, rowType.getFieldNames)
+
+val input2NullExprs = input2AccessExprs.map(
--- End diff --

I think `input2NullExpr` and `outerResultExpr` can be moved into the `else` 
branch of the `if (joinType == SemiJoinType.INNER)` condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88336697
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
--- End diff --

If this method is also used for `TableFunction`, the docs should be adapted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88338243
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -611,6 +612,130 @@ class Table(
   }
 
   /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply(split('c).as('s)).select('a,'b,'c,'s)
+* }}}
+*/
+  def crossApply(udtf: TableFunctionCall[_]): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.crossApply("split('c') as (s)").select("a, b, c, s")
+* }}}
+*/
+  def crossApply(udtf: String): Table = {
+applyInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+* The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
+* that produces matching values from the table-valued function (which 
is on the right side of
+* the operator).
+*
+* The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
+*
+* Example:
+*
+* {{{
+*   class MySplitUDTF extends TableFunction[String] {
+* def eval(str: String): Unit = {
+*   str.split("#").foreach(collect)
+* }
+*   }
+*
+*   val split = new MySplitUDTF()
+*   table.outerApply(split('c).as('s)).select('a,'b,'c,'s)
+* }}}
+*/
+  def outerApply(udtf: TableFunctionCall[_]): Table = {
+applyInternal(udtf, JoinType.LEFT_OUTER)
+  }
+
+  /**
+* The Outer Apply returns all the rows from the outer table (table on 
the left of the Apply
+* operator), and rows that do not matches the condition from the 
table-valued function (which
+* is on the right side of the operator), NULL values are displayed.
+*
+* The Outer Apply is equivalent to Left Outer Join, but it works with 
a table-valued function.
+*
+* Example:
+*
+* {{{
+*   val split = new MySplitUDTF()
+*   table.crossApply("split('c') as (s)").select("a, b, c, s")
--- End diff --

`crossApply` should be `outerApply`. Please check the whole docs for this 
method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88336271
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
+
+  private val rows: ListBuffer[T] = new ListBuffer
+
+  /**
+* Emit an output row
+*
+* @param row the output row
+*/
+  protected def collect(row: T): Unit = {
+// cache rows for now, maybe immediately process them further
+rows += row
+  }
+
+
+  @Internal
+  def getRowsIterator = rows.toIterator
+
+  @Internal
+  def clear() = rows.clear()
+
+  // this method will not be called, because we need to register multiple 
sql function at one time
+  override private[flink] final def createSqlFunction(
+  name: String,
+  typeFactory: FlinkTypeFactory)
+: SqlFunction = {
+throw new UnsupportedOperationException("this method should not be 
called")
+  }
+
+  // 
--
+
+  /**
+* Returns the result type of the evaluation method with a given 
signature.
+*
+* This method needs to be overriden in case Flink's type extraction 
facilities are not
+* sufficient to extract the [[TypeInformation]] based on the return 
type of the evaluation
+* method. Flink's type extraction facilities can handle basic types or
+* simple POJOs but might be wrong for more complex, custom, or 
composite types.
+*
+* @return [[TypeInformation]] of result type or null if Flink should 
determine the type
+*/
+  def getResultType: TypeInformation[T] = null
+
+  /**
+* Returns [[TypeInformation]] about the operands of the evaluation 
method with a given
+* signature.
+*
+* In order to perform operand type inference in SQL (especially when 
NULL is used) it might be
+   

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r87304840
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -360,7 +370,8 @@ case class Join(
 left: LogicalNode,
 right: LogicalNode,
 joinType: JoinType,
-condition: Option[Expression]) extends BinaryNode {
+condition: Option[Expression],
+corId: Option[CorrelationId] = None) extends BinaryNode {
--- End diff --

CorrelationId is a Calcite class. At this point we do all validation 
ourselves. Can we replace it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r87303197
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
 ---
@@ -305,6 +305,16 @@ object ScalarFunctions {
 )
   )
 
+// user-defined table function
+case tsf: TableSqlFunction =>
--- End diff --

Should we rename this class (`ScalarFunctions`) because it contains also 
`TableFunction` logic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88335748
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
+
+  private val rows: ListBuffer[T] = new ListBuffer
+
+  /**
+* Emit an output row
+*
+* @param row the output row
+*/
+  protected def collect(row: T): Unit = {
+// cache rows for now, maybe immediately process them further
+rows += row
+  }
+
+
+  @Internal
--- End diff --

Remove `@Internal` annotation. Annotations are only used in specific Maven 
modules (flink-core, flink-java, flink-scala, ...) but not yet in flink-table


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r88335539
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
--- End diff --

Maybe add a short code example for `eval()` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >