[
https://issues.apache.org/jira/browse/FLINK-12337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-12337:
-----------------------------------
Labels: auto-unassigned pull-request-available (was:
pull-request-available stale-assigned)
> [TableSQL/Planner] InsertInto method should configure TableSink
> ----------------------------------------------------------------
>
> Key: FLINK-12337
> URL: https://issues.apache.org/jira/browse/FLINK-12337
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Planner
> Reporter: Artsem Semianenka
> Assignee: Artsem Semianenka
> Priority: Major
> Labels: auto-unassigned, pull-request-available
>
> In the current implementation of TableEnvironment the
> [insertInto(..)|https://github.com/apache/flink/blob/7347c9e2094c5243ef7db34bde1fd87ea5e7641d/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala#L656]
> method check is the source Table Schema exactly the same as sink schema
> otherwise, it throws a ValidationException.
> Let's imagine the following setup:
> I have a sink wich produce the Row with fields A and B. (for example Kafka)
> By the other hand, I have an External Catalog which produce the sink for
> Database which support UPSERT (for example PostgreSQL or Cloudera Kudu in my
> case) with schema of fields :
> * A _required_
> * B _required_
> * C _+optional+_
> I want to UPSERT only A and B fields in the sink. But as far as sink was
> created from the external catalog it knows only the schema of the table.
> (fields A B C) and when I create the query like this I got the
> ValidationException.
> {code:java}
> INSERT INTO sink.table
> SELECT A, B FROM src.topic{code}
> I propose to validate in *insertInto(..)* method does the source schema is
> the subset of sink schema and if yes call the configure(..) method for the
> sink. In this case, the sink can adapt for source schema if it is possible.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)