frank wang created FLINK-10851: ---------------------------------- Summary: sqlUpdate support complex insert grammar Key: FLINK-10851 URL: https://issues.apache.org/jira/browse/FLINK-10851 Project: Flink Issue Type: Bug Reporter: frank wang
my code is {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");}} but flink give me error info, said kafka "No table was registered under the name kafka" i modify the code ,that is ok now TableEnvironment.scala {code:java} def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } } {code} should modify to this {code:java} def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table //val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) val targetTableName = insert.getTargetTable.toString // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } } {code} i hope this can be acceptted, thx -- This message was sent by Atlassian JIRA (v7.6.3#76005)