Hi Jing:
Thanks for your response and example.
Is there a DataStream api for using the upsert functionality ?
Also, is there any reason for why the TableJdbcUpsertOutputFormat constructors 
are not public ? 
Thanks again for your help.
Mans
    On Monday, October 18, 2021, 12:30:36 AM EDT, JING ZHANG 
<beyond1...@gmail.com> wrote:  
 
 Hi,If you need JDBC upsert functionality, it's easier to implement app using 
Flink SQL. You could use JDBC Table Connector [1]. You could define primary key 
in DDL when writing data to external database. See CREATE TABLE DDL for more 
details about PRIMARY KEY syntax.I find an example in 
`JdbcUpsertTableSinkITCase` of flink-connector-jdbc, hope this helps.
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table t =
        tEnv.fromDataStream(
                get4TupleDataStream(env)
                        .assignTimestampsAndWatermarks(
                                new AscendingTimestampExtractor<
                                        Tuple4<Integer, Long, String, 
Timestamp>>() {
                                    @Override
                                    public long extractAscendingTimestamp(
                                            Tuple4<Integer, Long, String, 
Timestamp>
                                                    element) {
                                        return element.f0;
                                    }
                                }),
                $("id"),
                $("num"),
                $("text"),
                $("ts"));

tEnv.createTemporaryView("T", t);
tEnv.executeSql(
        "CREATE TABLE upsertSink ("
                + "  cnt BIGINT,"
                + "  lencnt BIGINT,"
                + "  cTag INT,"
                + "  ts TIMESTAMP(3)"
                + ") WITH ("
                + "  'connector.type'='jdbc',"
                + "  'connector.url'='XXXX',"
                + "  'connector.table'='upsertSink'"
                + ")");

tEnv.executeSql(
                "INSERT INTO upsertSink \n"
                        + "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS 
ts\n"
                        + "FROM (\n"
                        + "  SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS 
ts\n"
                        + "  FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE 
WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n"
                        + "  GROUP BY len, cTag\n"
                        + ")\n"
                        + "GROUP BY cnt, cTag")
        .await();
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#key-handling[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table
Best,JING ZHANG
M Singh <mans2si...@yahoo.com> 于2021年10月17日周日 上午12:59写道:

Hi Folks:
I am working on Flink DataStream pipeline and would like to use JDBC upsert 
functionality.  I found a class TableJdbcUpsertOutputFormat but am not sure who 
to use it with the JdbcSink as shown in the document 
(https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/connector/jdbc/JdbcSink.html).
 
I could not find how to pass OutputFormat argument to the JDBC sink.
Please let me know if there is any documentation or example for using JDBC sink 
with upsert for DataStreams.
Thanks


 

  

Reply via email to