Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3829#discussion_r136118894
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
    @@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
         *   tEnv.sql(s"SELECT * FROM $table")
         * }}}
         *
    -    * @param query The SQL query to evaluate.
    +    * @param sql The SQL string to evaluate.
         * @return The result of the query as Table.
         */
    -  def sql(query: String): Table = {
    +  @deprecated("use [[sqlQuery()]] instead")
    +  def sql(sql: String): Table = {
    +    sqlQuery(sql)
    +  }
    +
    +  /**
    +    * Evaluates a SQL Select query on registered tables and retrieves the 
result as a
    +    * [[Table]].
    +    *
    +    * All tables referenced by the query must be registered in the 
TableEnvironment. But
    +    * [[Table.toString]] will automatically register an unique table name 
and return the
    +    * table name. So it allows to call SQL directly on tables like this:
    +    *
    +    * {{{
    +    *   val table: Table = ...
    +    *   // the table is not registered to the table environment
    +    *   tEnv.sqlSelect(s"SELECT * FROM $table")
    +    * }}}
    +    *
    +    * @param sql The SQL string to evaluate.
    +    * @return The result of the query as Table or null of the DML insert 
operation.
    +    */
    +  def sqlQuery(sql: String): Table = {
    +    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
    +    // parse the sql query
    +    val parsed = planner.parse(sql)
    +    if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
    +      // validate the sql query
    +      val validated = planner.validate(parsed)
    +      // transform to a relational tree
    +      val relational = planner.rel(validated)
    +      new Table(this, LogicalRelNode(relational.rel))
    +    } else {
    +      throw new TableException(
    +        "Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
    +          "WITH, ORDER_BY, EXPLICIT_TABLE")
    +    }
    +  }
    +
    +  /**
    +    * Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
    +    * such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
    +    * statement;
    +    * Currently only support a SQL INSERT statement on registered tables 
and has no return value.
    +    *
    +    * All tables referenced by the query must be registered in the 
TableEnvironment. But
    +    * [[Table.toString]] will automatically register an unique table name 
and return the
    +    * table name. So it allows to call SQL directly on tables like this:
    +    *
    +    * {{{
    +    *   /// register table sink for insertion
    +    *   tEnv.registerTableSink("target_table", ...
    +    *   val sourceTable: Table = ...
    +    *   // sourceTable is not registered to the table environment
    +    *   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
    +    * }}}
    +    *
    +    * @param sql The SQL String to evaluate.
    +    */
    +  def sqlUpdate(sql: String): Unit = {
    +    sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
    +  }
    +
    +  /**
    +    * Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
    +    * such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
    +    * statement;
    +    * Currently only support a SQL INSERT statement on registered tables 
and has no return value.
    +    *
    +    * All tables referenced by the query must be registered in the 
TableEnvironment. But
    +    * [[Table.toString]] will automatically register an unique table name 
and return the
    +    * table name. So it allows to call SQL directly on tables like this:
    +    *
    +    * {{{
    +    *   /// register table sink for insertion
    +    *   tEnv.registerTableSink("target_table", ...
    +    *   val sourceTable: Table = ...
    +    *   // sourceTable is not registered to the table environment
    +    *   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
    +    * }}}
    +    *
    +    * @param sql The SQL String to evaluate.
    +    * @param config The [[QueryConfig]] to use.
    +    */
    +  def sqlUpdate(sql: String, config: QueryConfig): Unit = {
         val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
         // parse the sql query
    -    val parsed = planner.parse(query)
    -    // validate the sql query
    -    val validated = planner.validate(parsed)
    -    // transform to a relational tree
    -    val relational = planner.rel(validated)
    +    val parsed = planner.parse(sql)
    +    parsed match {
    +      case insert: SqlInsert => {
    +        // validate the sql query
    +        planner.validate(parsed)
    +
    +        // validate sink table
    +        val targetName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
    +        val targetTable = getTable(targetName)
    +        if (null == targetTable || 
!targetTable.isInstanceOf[TableSinkTable[_]]) {
    +          throw new TableException("SQL INSERT operation need a registered 
TableSink Table!")
    +        }
    +        // validate unsupported partial insertion to sink table
    +        val sinkTable = targetTable.asInstanceOf[TableSinkTable[_]]
    +        if (null != insert.getTargetColumnList && 
insert.getTargetColumnList.size() !=
    +          sinkTable.fieldTypes.length) {
    +          throw new TableException(
    +            "SQL INSERT do not support insert partial columns of the 
target table due to table " +
    +              "columns haven’t nullable property definition for now!")
    +        }
     
    -    new Table(this, LogicalRelNode(relational.rel))
    +        writeToSink(
    +          new Table(this, 
LogicalRelNode(planner.rel(insert.getSource).rel)),
    +          sinkTable.tableSink,
    +          config)
    +      }
    +      case _ =>
    +        throw new TableException("Unsupported sql query! sqlUpdate only 
accept INSERT currently.")
    --- End diff --
    
    Change to 
    > Unsupported SQL query! sqlUpdate() only accepts SQL statements of type 
INSERT.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to