UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row> ,当 false 时代表 delelte,true 时代表 upsert 消息。
Best, Jark On Tue, 28 Apr 2020 at 14:05, 1101300123 <hdxg1101300...@163.com> wrote: > 我看源码这样写道: > /** > * Get dialect upsert statement, the database has its own upsert syntax, > such as Mysql > * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO > UPDATE SET.. > * > * @return None if dialect does not support upsert statement, the writer > will degrade to > * the use of select + update/insert, this performance is poor. > */ > default Optional<String> getUpsertStatement( > String tableName, String[] fieldNames, String[] uniqueKeyFields) { > return Optional.empty(); > } > 不同的数据库产品有不同的语句,所以默认实现是delete +insert > > > 但是我看 > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { > Row pk = entry.getKey(); > Tuple2<Boolean, Row> tuple = entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } > > > 方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录 > 在2020年4月28日 11:43,wangl...@geekplus.com.cn<wangl...@geekplus.com.cn> 写道: > > Thanks Leonard, > > JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO ON > DUPLICATE KEY 吗? > 这个在源代码哪个地方呢? > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > > 发件人: Leonard Xu > 发送时间: 2020-04-27 12:58 > 收件人: user-zh > 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题 > Hi,wanglei > > INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 > 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。 > 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统 > 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, > RetractStreamSink) > > 我看 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 没有 Retract 方式 > 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? > 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract. > 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, > 也不支持retract。 > > 如若不带 group by 直接: > INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src > 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? > > 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新, > 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可 > > Best, > > Leonard Xu >