[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-17 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r218205054
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -749,44 +748,48 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 // check that sink table exists
 if (null == sinkTableName) throw TableException("Name of TableSink must 
not be null.")
 if (sinkTableName.isEmpty) throw TableException("Name of TableSink must 
not be empty.")
-if (!isRegistered(sinkTableName)) {
-  throw TableException(s"No table was registered under the name 
$sinkTableName.")
-}
 
 getTable(sinkTableName) match {
 
   // check for registered table that wraps a sink
-  case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined =>
-val tableSink = s.tableSinkTable.get.tableSink
-// validate schema of source table and table sink
-val srcFieldTypes = table.getSchema.getTypes
-val sinkFieldTypes = tableSink.getFieldTypes
-
-if (srcFieldTypes.length != sinkFieldTypes.length ||
-  srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF 
!= snkF}) {
-
-  val srcFieldNames = table.getSchema.getColumnNames
-  val sinkFieldNames = tableSink.getFieldNames
-
-  // format table and table sink schema strings
-  val srcSchema = srcFieldNames.zip(srcFieldTypes)
-.map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
-.mkString("[", ", ", "]")
-  val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
-.map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
-.mkString("[", ", ", "]")
-
-  throw ValidationException(
-s"Field types of query result and registered TableSink 
$sinkTableName do not match.\n" +
-  s"Query result schema: $srcSchema\n" +
-  s"TableSink schema:$sinkSchema")
-}
+  case Some(s: TableSourceSinkTable[_, _]) => s.tableSinkTable match {
 
 Review comment:
   We can also check with `case None => ` whether we found a table or not and 
avoid nesting `match` blocks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-17 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r218205252
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -828,12 +831,28 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 rootSchema.getTableNames.contains(name)
   }
 
-  protected def getTable(name: String): org.apache.calcite.schema.Table = {
-rootSchema.getTable(name)
-  }
+  /**
+* Get a table from either internal or external catalogs.
+*
+* @param name The name of the table.
+* @return The table registered either internally or externally, None 
otherwise.
+*/
+  protected def getTable(name: String): 
Option[org.apache.calcite.schema.Table] = {
+val internalTable = rootSchema.getTable(name)
 
-  protected def getRowType(name: String): RelDataType = {
-rootSchema.getTable(name).getRowType(typeFactory)
+if (internalTable != null) {
+  Some(internalTable)
+} else {
+  // search external catalogs
+  var (subSchema, tableName) = (rootSchema, name)
 
 Review comment:
   We can use `val` and a recursive internal function to avoid the `var`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-17 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r218205508
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -828,12 +831,28 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 rootSchema.getTableNames.contains(name)
   }
 
-  protected def getTable(name: String): org.apache.calcite.schema.Table = {
-rootSchema.getTable(name)
-  }
+  /**
+* Get a table from either internal or external catalogs.
+*
+* @param name The name of the table.
+* @return The table registered either internally or externally, None 
otherwise.
+*/
+  protected def getTable(name: String): 
Option[org.apache.calcite.schema.Table] = {
+val internalTable = rootSchema.getTable(name)
 
-  protected def getRowType(name: String): RelDataType = {
-rootSchema.getTable(name).getRowType(typeFactory)
+if (internalTable != null) {
+  Some(internalTable)
+} else {
+  // search external catalogs
+  var (subSchema, tableName) = (rootSchema, name)
+  while (tableName.contains(".")) {
+val Array(subcatalog, table) = tableName.split("\\.", 2)
+subSchema = subSchema.getSubSchema(subcatalog)
 
 Review comment:
   We need to check for `subSchema != null` to prevent an NPE if a subschema 
does not exist.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-15 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r217895759
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -820,20 +820,59 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 
   /**
 * Checks if a table is registered under the given name.
+* Internal and external catalogs are both checked.
 *
 * @param name The table name to check.
 * @return true, if a table is registered under the name, false otherwise.
 */
   protected[flink] def isRegistered(name: String): Boolean = {
 
 Review comment:
   Keep `isRegistered` as it is. It is used be other methods as well.
   We should change the logic in `insertInto` to directly call `getTable()`. 
This will also improve the performance, because we only need to talk once to 
the external catalog (which might be backed by an external service).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-15 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r217895771
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -820,20 +820,59 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 
   /**
 * Checks if a table is registered under the given name.
+* Internal and external catalogs are both checked.
 *
 * @param name The table name to check.
 * @return true, if a table is registered under the name, false otherwise.
 */
   protected[flink] def isRegistered(name: String): Boolean = {
-rootSchema.getTableNames.contains(name)
+var isRegistered = rootSchema.getTableNames.contains(name)
+
+// check if the table exists in external catalogs
+if (!isRegistered) {
+  val (externalSchema, externalTableName) = resolveExternalTable(name)
+  if (externalSchema != null) {
+isRegistered = externalSchema.getTableNames.contains(externalTableName)
+  }
+}
+
+return isRegistered
   }
 
   protected def getTable(name: String): org.apache.calcite.schema.Table = {
-rootSchema.getTable(name)
+var table = rootSchema.getTable(name)
+
+// check if the table exists in external catalogs
+if (table == null) {
+  val (externalSchema, externalTableName) = resolveExternalTable(name)
+  if (externalSchema != null) {
+table = externalSchema.getTable(externalTableName)
+  }
+}
+
+return table
 
 Review comment:
   Scala discourages the use of `return`. The result of the last expression of 
a function is it's return value.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-15 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r217896170
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -820,20 +820,59 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 
   /**
 * Checks if a table is registered under the given name.
+* Internal and external catalogs are both checked.
 *
 * @param name The table name to check.
 * @return true, if a table is registered under the name, false otherwise.
 */
   protected[flink] def isRegistered(name: String): Boolean = {
-rootSchema.getTableNames.contains(name)
+var isRegistered = rootSchema.getTableNames.contains(name)
+
+// check if the table exists in external catalogs
+if (!isRegistered) {
+  val (externalSchema, externalTableName) = resolveExternalTable(name)
+  if (externalSchema != null) {
+isRegistered = externalSchema.getTableNames.contains(externalTableName)
+  }
+}
+
+return isRegistered
   }
 
   protected def getTable(name: String): org.apache.calcite.schema.Table = {
-rootSchema.getTable(name)
+var table = rootSchema.getTable(name)
+
+// check if the table exists in external catalogs
+if (table == null) {
+  val (externalSchema, externalTableName) = resolveExternalTable(name)
+  if (externalSchema != null) {
+table = externalSchema.getTable(externalTableName)
+  }
+}
+
+return table
   }
 
   protected def getRowType(name: String): RelDataType = {
 
 Review comment:
   Method does not seem to be used and can be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-15 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r217895776
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -820,20 +820,59 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 
   /**
 * Checks if a table is registered under the given name.
+* Internal and external catalogs are both checked.
 *
 * @param name The table name to check.
 * @return true, if a table is registered under the name, false otherwise.
 */
   protected[flink] def isRegistered(name: String): Boolean = {
-rootSchema.getTableNames.contains(name)
+var isRegistered = rootSchema.getTableNames.contains(name)
+
+// check if the table exists in external catalogs
+if (!isRegistered) {
+  val (externalSchema, externalTableName) = resolveExternalTable(name)
+  if (externalSchema != null) {
+isRegistered = externalSchema.getTableNames.contains(externalTableName)
+  }
+}
+
+return isRegistered
   }
 
   protected def getTable(name: String): org.apache.calcite.schema.Table = {
-rootSchema.getTable(name)
+var table = rootSchema.getTable(name)
 
 Review comment:
   use `val` when possible


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-15 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r217896469
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -820,20 +820,59 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 
   /**
 * Checks if a table is registered under the given name.
+* Internal and external catalogs are both checked.
 *
 * @param name The table name to check.
 * @return true, if a table is registered under the name, false otherwise.
 */
   protected[flink] def isRegistered(name: String): Boolean = {
-rootSchema.getTableNames.contains(name)
+var isRegistered = rootSchema.getTableNames.contains(name)
+
+// check if the table exists in external catalogs
+if (!isRegistered) {
+  val (externalSchema, externalTableName) = resolveExternalTable(name)
+  if (externalSchema != null) {
+isRegistered = externalSchema.getTableNames.contains(externalTableName)
+  }
+}
+
+return isRegistered
   }
 
   protected def getTable(name: String): org.apache.calcite.schema.Table = {
-rootSchema.getTable(name)
+var table = rootSchema.getTable(name)
+
+// check if the table exists in external catalogs
+if (table == null) {
+  val (externalSchema, externalTableName) = resolveExternalTable(name)
+  if (externalSchema != null) {
+table = externalSchema.getTable(externalTableName)
+  }
+}
+
+return table
   }
 
   protected def getRowType(name: String): RelDataType = {
-rootSchema.getTable(name).getRowType(typeFactory)
+val table = getTable(name)
+if (table != null) {
+  table.getRowType(typeFactory)
+}
+
+return null
+  }
+
+  protected def resolveExternalTable(name: String): (SchemaPlus, String) = {
 
 Review comment:
   this function can be inlined into `getTable()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Support external sink table in the INSERT INTO clause

2018-09-15 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r217897374
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -820,20 +820,59 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 
   /**
 * Checks if a table is registered under the given name.
+* Internal and external catalogs are both checked.
 *
 * @param name The table name to check.
 * @return true, if a table is registered under the name, false otherwise.
 */
   protected[flink] def isRegistered(name: String): Boolean = {
-rootSchema.getTableNames.contains(name)
+var isRegistered = rootSchema.getTableNames.contains(name)
+
+// check if the table exists in external catalogs
+if (!isRegistered) {
+  val (externalSchema, externalTableName) = resolveExternalTable(name)
+  if (externalSchema != null) {
+isRegistered = externalSchema.getTableNames.contains(externalTableName)
+  }
+}
+
+return isRegistered
   }
 
   protected def getTable(name: String): org.apache.calcite.schema.Table = {
 
 Review comment:
   the method should return `Option[org.apache.calcite.schema.Table]`. We need 
to adjust `StreamTableEnvironment` and `BatchTableEnvironment` for this and not 
wrap the result of the `getTable()` calls another time in `Option`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services