flink-1.11 在 windows 下怎样启动
我看 flink-1.11 发布包 bin 目录没有 windows 启动所需的 .bat 文件了。 那在 windows 下怎样启动呢? 谢谢 王磊 wangl...@geekplus.com.cn
Re: Re: flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka
谢谢,我直接更改了 KafkaDynamicSinkBase 的 getChangelogMode 方法, 是可以实现目的的。 更改前: public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return this.encodingFormat.getChangelogMode(); } 更改后:public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.UPDATE_AFTER) .build(); }而且这样更改以后 UPDATE_BEFORE 的记录被过滤掉了,没有被发送到 Kafka 谢谢,王磊 wangl...@geekplus.com.cn Sender: Benchao Li Send Time: 2020-07-17 20:18 Receiver: user-zh Subject: Re: flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据 wangl...@geekplus.com.cn 于2020年7月17日周五 下午1:02写道: > > INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) > from kafka_ods_artemis_out_order group by warehouse_id; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Table sink > 'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming > update changes which is produced by node > GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS > EXPR$1]) > > 在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。 > > 我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 > GroupBy 的结果也发送到 Kafka 呢? > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > > -- Best, Benchao Li
flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka
INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) from kafka_ods_artemis_out_order group by warehouse_id; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Table sink 'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS EXPR$1]) 在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。 我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 GroupBy 的结果也发送到 Kafka 呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
谢谢,我理解了。 wangl...@geekplus.com.cn Sender: Harold.Miao Send Time: 2020-07-16 19:33 Receiver: user-zh Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? 我的理解 : 大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class 类似下面的代码 private static T findSingleInternal( Class factoryClass, Map properties, Optional classLoader) { List tableFactories = discoverFactories(classLoader); List filtered = filter(tableFactories, factoryClass, properties); if (filtered.size() > 1) { throw new AmbiguousTableFactoryException( filtered, factoryClass, tableFactories, properties); } else { return filtered.get(0); } } private static List discoverFactories(Optional classLoader) { try { List result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); } } wangl...@geekplus.com.cn 于2020年7月16日周四 下午7:04写道: > > 我在 > flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory > 找到了 SPI 的配置: > > org.apache.flink.formats.json.JsonFileSystemFormatFactory > org.apache.flink.formats.json.JsonFormatFactory > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory > org.apache.flink.formats.json.canal.CanalJsonFormatFactory > > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep > 代码没找到类似的关系映射配置。 > > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > > Sender: godfrey he > Send Time: 2020-07-16 16:38 > Receiver: user-zh > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors > > Best, > Godfrey > > wangl...@geekplus.com.cn 于2020年7月16日周四 > 下午4:02写道: > > > 比如: > > > > CREATE TABLE my_table ( > > id BIGINT, > > first_name STRING, > > last_name STRING, > > email STRING > > ) WITH ( > > 'connector'='kafka', > > 'topic'='user_topic', > > 'properties.bootstrap.servers'='localhost:9092', > > 'scan.startup.mode'='earliest-offset', > > 'format'='debezium-json' > > ); > > > > 最终解析 debezium-json 应该是 > > > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > > 下面的代码 > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > > > 谢谢, > > 王磊 > > > > > > wangl...@geekplus.com.cn > > > > > -- Best Regards, Harold Miao
Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
我在 flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory 找到了 SPI 的配置: org.apache.flink.formats.json.JsonFileSystemFormatFactory org.apache.flink.formats.json.JsonFormatFactory org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory org.apache.flink.formats.json.canal.CanalJsonFormatFactory 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 代码没找到类似的关系映射配置。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: godfrey he Send Time: 2020-07-16 16:38 Receiver: user-zh Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors Best, Godfrey wangl...@geekplus.com.cn 于2020年7月16日周四 下午4:02写道: > 比如: > > CREATE TABLE my_table ( > id BIGINT, > first_name STRING, > last_name STRING, > email STRING > ) WITH ( > 'connector'='kafka', > 'topic'='user_topic', > 'properties.bootstrap.servers'='localhost:9092', > 'scan.startup.mode'='earliest-offset', > 'format'='debezium-json' > ); > > 最终解析 debezium-json 应该是 > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > 下面的代码 > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > >
Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval
直接在 flink-conf.yaml 文件中加配置 execution.checkpointing.interval: 6 wangl...@geekplus.com.cn Sender: Harold.Miao Send Time: 2020-07-16 13:27 Receiver: user-zh Subject: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval hi flink users 通过sql-client提交sql怎么设置checkpointing.interval? 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。 谢谢 -- Best Regards, Harold Miao
FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
比如: CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); 最终解析 debezium-json 应该是 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 下面的代码 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复: FlinkSQL 入到 MySQL后汉字乱码
是 MySQL_tableB 所在的 server 端字符设置有问题。 配置中加上下面的配置就好了。 [mysqld] character-set-server=utf8 [client] default-character-set=utf8 [mysql] default-character-set=utf8 wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-07-15 16:34 收件人: user-zh 主题: FlinkSQL 入到 MySQL后汉字乱码 KafkaTable:kafka 消息 MySQL_tableA: 维表,维表里 value 是汉字 MySQL_tableB: join后的结果表。和 MySQL_tableA 不在同一台服务器上。 我直接在 flink sql client SELECT 是可以正常显示, 但 INSERT INTO MySQL_tableB SELECT 后到 MySQL_tableB 里去查看,汉字就乱码了。 大家有什么建议吗? 谢谢, 王磊 wangl...@geekplus.com.cn
FlinkSQL 入到 MySQL后汉字乱码
KafkaTable:kafka 消息 MySQL_tableA: 维表,维表里 value 是汉字 MySQL_tableB: join后的结果表。和 MySQL_tableA 不在同一台服务器上。 我直接在 flink sql client SELECT 是可以正常显示, 但 INSERT INTO MySQL_tableB SELECT 后到 MySQL_tableB 里去查看,汉字就乱码了。 大家有什么建议吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: 不能实时读取实时写入到 Hive 的数据
谢谢,根本原因就是 flink sql-client 客户端默认没有设置 checkpoint 导致的。 wangl...@geekplus.com.cn Sender: Rui Li Send Time: 2020-07-14 18:29 Receiver: user-zh cc: Leonard Xu; 夏帅 Subject: Re: Re: 不能实时读取实时写入到 Hive 的数据 流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint On Tue, Jul 14, 2020 at 5:49 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 我把问题简化一下,创建 Hive 表时不带任何参数 > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) STORED AS parquet > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time FROM kafka_ods_wms_pick_order; > > 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢? > 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM > kafka_ods_wms_pick_order 确实是有数据返回的。 > > 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0. > 是需要让 job 做 checkpoint 才能写到 hdfs 上吗? > 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢? > > 谢谢, > 王磊 > > > > > wangl...@geekplus.com.cn > > > 发件人: Leonard Xu > 发送时间: 2020-07-14 17:20 > 收件人: user-zh; 夏帅 > 抄送: wangl...@geekplus.com.cn > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > > Hi, wanglei > > 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive > 的分区已完成信息(通过metastore或success文件). > > 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 > > 祝好, > Leonard Xu > > > 在 2020年7月14日,16:59,夏帅 写道: > > 你好, > 可以参考下这个问题的解决 > > http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html > > > -- > 发件人:wangl...@geekplus.com.cn > 发送时间:2020年7月14日(星期二) 16:50 > 收件人:user-zh ; 夏帅 ; > Leonard Xu > 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 > > > 应该是我没有理解 partitiion-commit 的意思,我看这里有文档: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit > > > CREATE TABLE kafka_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ods_wms_pick_order', > 'properties.bootstrap.servers' = ':9092', > 'properties.group.id' = 'testGroup', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > > > CREATE TABLE hive_ods_wms_pick_order ( > order_no STRING, > status INT, > dispatch_time TIMESTAMP > ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.delay'='1 h', > 'sink.partition-commit.policy.kind'='metastore,success-file' > ); > > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, > dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), > DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; > SELECT * FROM hive_ods_wms_pick_order /*+ > OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-07-24') */; > > > > > wangl...@geekplus.com.cn > > > Sender: 夏帅 > Send Time: 2020-07-14 16:43 > Receiver: user-zh; xbjtdcq > Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 > 你好, > 这说明写入的hive文件没有进行rollup,可以贴下SQL么 > -- > 发件人:wangl...@geekplus.com.cn > 发送时间:2020年7月14日(星期二) 16:40 > 收件人:user-zh ; xbjtdcq > 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 > 我加上了这个 tablehint 。 > 任务提交上去了,但客户端还是没有任何返回显示。 > 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 > 谢谢, > 王磊 > wangl...@geekplus.com.cn > 发件人: Leonard Xu > 发送时间: 2020-07-14 16:17 > 收件人: user-zh > 主题: Re: 不能实时读取实时写入到 Hive 的数据 > HI, wanglei > 你开启了 streaming-source.enable > 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints > 方便地指定参数。 > SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-05-20') */; > 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 > 祝好, > Leonard Xu > 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink > webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > > -- Best regards! Rui Li
回复: Re: 不能实时读取实时写入到 Hive 的数据
我把问题简化一下,创建 Hive 表时不带任何参数 CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) STORED AS parquet INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time FROM kafka_ods_wms_pick_order; 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢? 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM kafka_ods_wms_pick_order 确实是有数据返回的。 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0. 是需要让 job 做 checkpoint 才能写到 hdfs 上吗? 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢? 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 17:20 收件人: user-zh; 夏帅 抄送: wangl...@geekplus.com.cn 主题: Re: 不能实时读取实时写入到 Hive 的数据 Hi, wanglei 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 的分区已完成信息(通过metastore或success文件). 你看下夏帅贴的邮件,检查下 checkpoint 和 partition-commit的设置 祝好, Leonard Xu 在 2020年7月14日,16:59,夏帅 写道: 你好, 可以参考下这个问题的解决 http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html -- 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:50 收件人:user-zh ; 夏帅 ; Leonard Xu 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit CREATE TABLE kafka_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_wms_pick_order', 'properties.bootstrap.servers' = ':9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; wangl...@geekplus.com.cn Sender: 夏帅 Send Time: 2020-07-14 16:43 Receiver: user-zh; xbjtdcq Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 ------ 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh ; xbjtdcq 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: 试验了一下 Flink-1.11 hive streaming 的功能 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html 创建 kafka 表,通过 SQL 实时写入 Hive. 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 谢谢, 王磊 wangl...@geekplus.com.cn
Re: 回复: 不能实时读取实时写入到 Hive 的数据
应该是我没有理解 partitiion-commit 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit CREATE TABLE kafka_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'ods_wms_pick_order', 'properties.bootstrap.servers' = ':9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE hive_ods_wms_pick_order ( order_no STRING, status INT, dispatch_time TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order; SELECT * FROM hive_ods_wms_pick_order /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-24') */; wangl...@geekplus.com.cn Sender: 夏帅 Send Time: 2020-07-14 16:43 Receiver: user-zh; xbjtdcq Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据 你好, 这说明写入的hive文件没有进行rollup,可以贴下SQL么 -- 发件人:wangl...@geekplus.com.cn 发送时间:2020年7月14日(星期二) 16:40 收件人:user-zh ; xbjtdcq 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据 我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI > 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn >
回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。 任务提交上去了,但客户端还是没有任何返回显示。 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-07-14 16:17 收件人: user-zh 主题: Re: 不能实时读取实时写入到 Hive 的数据 HI, wanglei 你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。 SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; 就在你看得这个页面应该有对应的文档说明如何读取hive数据。 祝好, Leonard Xu > 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道: > > > 试验了一下 Flink-1.11 hive streaming 的功能 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html > > 创建 kafka 表,通过 SQL 实时写入 Hive. > > 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI > 页面观察 这个 select * from hive_table 的 job 已经结束了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn >
不能实时读取实时写入到 Hive 的数据
试验了一下 Flink-1.11 hive streaming 的功能 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html 创建 kafka 表,通过 SQL 实时写入 Hive. 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。 谢谢, 王磊 wangl...@geekplus.com.cn
Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); 如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics. 但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: kcz Send Time: 2020-07-03 09:13 Receiver: wanglei2 Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧 -- 原始邮件 -- 发件人: 王磊2 发送时间: 2020年7月2日 21:46 收件人: user-zh , 17610775726 <17610775...@163.com> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 没有明白你说的实现方式。 我最终要得到类似的 Metrics: myCounter_table1, myCounter_table2, ..., myCounter_tableX 但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。 谢谢, 王磊 -- 发件人:JasonLee <17610775...@163.com> 发送时间:2020年7月2日(星期四) 21:12 收件人:user-zh 主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 你把tablename传到下面metric里不就行了吗 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道: 全都是同一种类型的 metrics. 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 meter 类型) 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: JasonLee 发送时间: 2020-07-02 16:16 收件人: user-zh 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
全都是同一种类型的 metrics. 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 meter 类型) 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: JasonLee 发送时间: 2020-07-02 16:16 收件人: user-zh 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
????: ??????Flink State ?????????? state ????????????
OrderState get set ?? flink ??OrderState?? wang...@geekplus.com.cn 1048262223 ?? 2020-06-09 18:11 user-zh ?? ??Flink State ?? state Hi flink??OrderStatepojo??savepoint?? Best, Yichao Yang -- -- ??: "wangl...@geekplus.com.cn"
Flink State 增加字段后 state 还能识别吗?
写了个简单的类会在 Flink State 中使用: public class OrderState { private Integer warehouseId; private String orderNo; private String ownerCode; private Long inputDate; private int orderType; private int amount = 0; private int status = 0; . } 现在程序要升级,这个类还要增加一个新的字段。从state 能正常恢复吗? 也就是 flink run -s savepointdir 后能正常识别旧的代码保存的 state 吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
我试了下是可以的,但现在有一个访问 HDFS 的问题。 我用的 hadoop 是阿里云 EMR 管理, 在 EMR 管理的机器上可以以 hdfs://emr-cluster:8020/ 访问 HDFS 但我部署的 Flink 不属于 EMR 管理,这个地址是不能解析的,我只能写成 hdfs://active-namenode-ip:8020/ 的形式,NameNode 丧失了 HA 的功能 有什么方式解决这个问题吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Andrew Send Time: 2020-05-07 12:31 Receiver: user-zh Subject: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群 https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html ---原始邮件--- 发件人: "wangl...@geekplus.com.cn"
回复: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
看起来这个文档可以,我先试下: https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-05-07 12:23 收件人: user-zh 主题: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群 现在已经有了一个 Hadoop 集群。 我想在这个 集群外(不同的机器,网络互通)部署一个 standalone 模式的 flink cluster,配置 jobmanager 的 HA,访问和使用已有 Hadoop 的 HDFS,使用已有 Hadoop 的 zookeeper 这个有参考文档怎么操作吗? 谢谢, 王磊 wangl...@geekplus.com.cn
在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
现在已经有了一个 Hadoop 集群。 我想在这个 集群外(不同的机器,网络互通)部署一个 standalone 模式的 flink cluster,配置 jobmanager 的 HA,访问和使用已有 Hadoop 的 HDFS,使用已有 Hadoop 的 zookeeper 这个有参考文档怎么操作吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re:FlinkSQL Retraction 问题原理咨询
Thanks Jingsong Lee. 我用的是 MySQL,sink 表中没有任何主键或唯一键. 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 kafka 消息导致的行为: +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1 第1条消息:执行一个 INSERT 第2条消息:执行了 一个 DELETE, 一个 INSERT 第3条消息:执行了一个 INSERT ON DUPLICATE UPDATE 第4条消息:执行了两个 INSERT ON DUPLICATE UPDATE 我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是INSERT ON DUPLICATE UPDATE 不知道我这样理解是否正确。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-05-06 11:35 Receiver: user-zh Subject: Re: Re:FlinkSQL Retraction 问题原理咨询 Hi, 问题一:删除数据可不单单只是retract stream的功能。upsert stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert stream也有retract的input数据的。JDBC实现的是upsert stream的消费。 问题二:正确数据应该是: 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 ( 删除 zhongtong 1) 3 {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2 ( 删除yuantong 1) 4 {"order_id":2,"tms_company":"zhongtong"} 数据库2条记录: yuantong 1, zhongtong 1( 删除yuantong 2) 你用了什么dialect?是不是mysql? Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建? Best, Jingsong Lee On Wed, May 6, 2020 at 10:36 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 > tms_company 是有变化的。 > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 > > > > > wangl...@geekplus.com.cn > > 发件人: Michael Ran > 发送时间: 2020-04-30 17:23 > 收件人: user-zh > 主题: Re:FlinkSQL Retraction 问题原理咨询 > > > > 指定的更新键是tms_company? > > > 结果是: > yuantong:2 > zhongtong:2 > > > > > > > > > > > > > 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" > 写道: > > > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 > RDS, RDS 表没有主键,也没有唯一键。 > > > >INSERT INTO table_out select tms_company, count(distinct order_id) as > order_cnt from > >(select order_id, LAST_VALUE(tms_company) AS tms_company from > dwd_table group by order_id) > > group by tms_company; > > > > > >总共发送了 4 条消息,顺序如下: > > > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > > > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 > (上一条记录被删除了) > > > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, > yuantong 2 (增加了条记录,没有删除) > > > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, > yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除) > > > > > >问题一: > >第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? > > > >问题二: > > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? > > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? > > > >谢谢, > >王磊 > > > > > > > >wangl...@geekplus.com.cn > -- Best, Jingsong Lee
回复: Re:FlinkSQL Retraction 问题原理咨询
更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 tms_company 是有变化的。 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 wangl...@geekplus.com.cn 发件人: Michael Ran 发送时间: 2020-04-30 17:23 收件人: user-zh 主题: Re:FlinkSQL Retraction 问题原理咨询 指定的更新键是tms_company? 结果是: yuantong:2 zhongtong:2 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" 写道: > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS >表没有主键,也没有唯一键。 > >INSERT INTO table_out select tms_company, count(distinct order_id) as >order_cnt from >(select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table > group by order_id) > group by tms_company; > > >总共发送了 4 条消息,顺序如下: > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 >(上一条记录被删除了) > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 > (增加了条记录,没有删除) > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, >yuantong 1, zhongtong 1(增加了两条记录,没有删除) > > >问题一: >第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? > >问题二: > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? > >谢谢, >王磊 > > > >wangl...@geekplus.com.cn
FlinkSQL Retraction 问题原理咨询
自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。 INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id) group by tms_company; 总共发送了 4 条消息,顺序如下: 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了) 3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除) 4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除) 问题一: 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢? 问题二: 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢? 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Thanks Leonard, JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO ON DUPLICATE KEY 吗? 这个在源代码哪个地方呢? 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-04-27 12:58 收件人: user-zh 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题 Hi,wanglei > INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 > 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink) > 我看 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 没有 Retract 方式 > 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract. 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。 > 如若不带 group by 直接: > INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src > 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新, 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可 Best, Leonard Xu
回复: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Thanks Leonard, JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO ON DUPLICATE KEY 吗? 这个在源代码哪个地方呢? 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-04-27 12:58 收件人: user-zh 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题 Hi,wanglei > INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 > 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink) > 我看 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > 没有 Retract 方式 > 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract. 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。 > 如若不带 group by 直接: > INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src > 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新, 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可 Best, Leonard Xu
FlinkSQL Upsert/Retraction 写入 MySQL 的问题
INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。 但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 RetractStream 呢? 我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? 如若不带 group by 直接: INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? wangl...@geekplus.com.cn
Re: Re: 实现 KafkaUpsertTableSink
我只保留 KafkaRetractTableSourceSinkFactory 一个, KafkaRetractTableSinkBase 实现 RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。 @Override public DataStreamSink consumeDataStream(DataStream> dataStream) { DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -> x.f1); INSERT INTO table1 SELCET field, count(*) from table2 group by field 这是 一个 RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。 INSERT INTO table1 SELECT feild, 1 from table2 我理解这不是一个 RetractStream, 上面 dataStream.filter(x -> x.f0 == Boolean.TRUE) 的代码应该会出错,但实际上没有出错 还不是完全能理解,我再看一下吧。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Benchao Li Send Time: 2020-03-31 12:02 Receiver: user-zh Subject: Re: Re: 实现 KafkaUpsertTableSink 我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方, 然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的? wangl...@geekplus.com.cn 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 > KafkaRetractTableSourceSinkFactory 写了一遍 > 但这个应该怎样改才合适呢? > > 137 private static T > findSingleInternal( > 138 Class factoryClass, > 139 Map properties, > 140 Optional classLoader) { > 141 > 142 List tableFactories = > discoverFactories(classLoader); > 143 List filtered = filter(tableFactories, > factoryClass, properties); > 144 > 145 if (filtered.size() > 1) { > 146 throw new AmbiguousTableFactoryException( > 147 filtered, > 148 factoryClass, > 149 tableFactories, > 150 properties); > 151 } else { > 152 return filtered.get(0); > 153 } > 154 } > > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > > > Sender: wangl...@geekplus.com.cn > Send Time: 2020-03-31 10:50 > Receiver: user-zh > Subject: Re: RE: 实现 KafkaUpsertTableSink > > 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: > > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) > ... 3 more > > 这个改怎样解决呢? > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > Sender: wxchunj...@163.com > Send Time: 2020-03-29 10:32 > Receiver: user-zh@flink.apache.org > Subject: RE: 实现 KafkaUpsertTableSink > Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 > -Original Message- > From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org > On Behalf Of > Benchao Li > Sent: Saturday, March 28, 2020 6:28 PM > To: user-zh > Subject: Re: 实现 KafkaUpsertTableSink > Hi, > 你需要把你新增的Factory添加到 resources下的 > > META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? > 于2020年3月28日周六 下午5:38写道: > > 各位大佬: > > > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > > KafkaUpsertTableSink: > > > > KafkaUpsertTableSink > > > > KafkaUpsertTableSinkBase > > > > KafkaUpsertTableSourceSinkFactory > > > > KafkaUpsertTableSourceSinkFactoryBase > > > > MyKafkaValidator > > > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > > 呢? > >
Re: Re: 实现 KafkaUpsertTableSink
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 KafkaRetractTableSourceSinkFactory 写了一遍 但这个应该怎样改才合适呢? 137 private static T findSingleInternal( 138 Class factoryClass, 139 Map properties, 140 Optional classLoader) { 141 142 List tableFactories = discoverFactories(classLoader); 143 List filtered = filter(tableFactories, factoryClass, properties); 144 145 if (filtered.size() > 1) { 146 throw new AmbiguousTableFactoryException( 147 filtered, 148 factoryClass, 149 tableFactories, 150 properties); 151 } else { 152 return filtered.get(0); 153 } 154 } 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wangl...@geekplus.com.cn Send Time: 2020-03-31 10:50 Receiver: user-zh Subject: Re: RE: 实现 KafkaUpsertTableSink 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) ... 3 more 这个改怎样解决呢? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wxchunj...@163.com Send Time: 2020-03-29 10:32 Receiver: user-zh@flink.apache.org Subject: RE: 实现 KafkaUpsertTableSink Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 -Original Message- From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List discoverFactories(Optional > classLoader) { >try { > List result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; >} catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); >} > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > -- > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: RE: 实现 KafkaUpsertTableSink
我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) ... 3 more 这个改怎样解决呢? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: wxchunj...@163.com Send Time: 2020-03-29 10:32 Receiver: user-zh@flink.apache.org Subject: RE: 实现 KafkaUpsertTableSink Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 -Original Message- From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org On Behalf Of Benchao Li Sent: Saturday, March 28, 2020 6:28 PM To: user-zh Subject: Re: 实现 KafkaUpsertTableSink Hi, 你需要把你新增的Factory添加到 resources下的 META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? 于2020年3月28日周六 下午5:38写道: > 各位大佬: > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > KafkaUpsertTableSink: > > KafkaUpsertTableSink > > KafkaUpsertTableSinkBase > > KafkaUpsertTableSourceSinkFactory > > KafkaUpsertTableSourceSinkFactoryBase > > MyKafkaValidator > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > 呢? > > > > > /** > * Searches for factories using Java service providers. > * > * @return all factories in the classpath */ private static > List discoverFactories(Optional > classLoader) { >try { > List result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > //todo add > result.add(new KafkaUpsertTableSourceSinkFactory()); > return result; >} catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); >} > > } > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > 以成功运行的。 > > 非常感谢 > > > > > > -- > > Thanks > > venn > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: Re: flink 安装包的几个 jar 是怎么 build 出来的
flink-table-uber-blink 下 mvn clean install -DskipTests -Dscala-2.12 -DskipTests 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-26 18:15 Receiver: user-zh cc: jihongchao Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的 flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar) Best, Kurt On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar > 这个 jar 是从哪里 build 出来的呢? > > 我 clone github 上的源代码,mvn clean package > 我以为 flink-table/flink-table-planner-blink 目录下build 出的 > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 > flink-table-blink_2.12-1.10.0.jar 是对应的 > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > >
flink 安装包的几个 jar 是怎么 build 出来的
单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar 这个 jar 是从哪里 build 出来的呢? 我 clone github 上的源代码,mvn clean package 我以为 flink-table/flink-table-planner-blink 目录下build 出的 flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的 flink-table-blink_2.12-1.10.0.jar 是对应的 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。 谢谢, 王磊 wangl...@geekplus.com.cn
回复: Flink JDBC Driver是否支持创建流数据表
参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 下面的语法应该是不支持的: 'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} tEnv.sqlUpdate("CREATE TABLE pick_order (\n" + "order_no VARCHAR,\n" + "status INT\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'wanglei_test',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'connector.properties.0.key' = 'zookeeper.connect',\n" + "'connector.properties.0.value' = 'xxx:2181',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = 'xxx:9092',\n" + "'update-mode' = 'append',\n" + "'format.type' = 'json',\n" + "'format.derive-schema' = 'true'\n" + ")"); 王磊 wangl...@geekplus.com.cn 发件人: 赵峰 发送时间: 2020-03-24 21:28 收件人: user-zh 主题: Flink JDBC Driver是否支持创建流数据表 hi Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + "user_id BIGINT,\n" + "item_id BIGINT,\n" + "category_id BIGINT,\n" + "behavior STRING,\n" + "ts TIMESTAMP(3),\n" + "proctime as PROCTIME(),\n" + "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal', \n" + "'connector.topic' = 'flink_im02', \n" + "'connector.properties.group.id' = 'flink_im02_new',\n" + "'connector.startup-mode' = 'earliest-offset', \n" + "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + "'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰
Re: Re: flinkSQL join表的历史信息保存在哪里保存多久
Thanks, it works. wangl...@geekplus.com.cn Sender: sunfulin Send Time: 2020-03-12 14:19 Receiver: user-zh; wanglei2 cc: jinhai.me Subject: Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久 这样来用: StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max); 在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn" 写道: > >这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。 >StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法 >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >StreamQueryConfig qConfig = tableEnv.queryConfig(); > > > >wangl...@geekplus.com.cn > > >Sender: jinhai wang >Send Time: 2020-03-12 13:44 >Receiver: user-zh@flink.apache.org >Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久 >应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time > > >在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入: > > >两个从 kafka 创建的表: > >tableA: key valueA >tableB: key valueB > >用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from > tableA join tableB on tableA.key = tableB.key; >这两个表的历史数据在 flink 中存在哪里?存多久呢? > >比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? > >谢谢, >王磊 > > > >wangl...@geekplus.com.cn > >
Re: Re: flinkSQL join表的历史信息保存在哪里保存多久
这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。 StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); StreamQueryConfig qConfig = tableEnv.queryConfig(); wangl...@geekplus.com.cn Sender: jinhai wang Send Time: 2020-03-12 13:44 Receiver: user-zh@flink.apache.org Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久 应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time 在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入: 两个从 kafka 创建的表: tableA: key valueA tableB: key valueB 用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from tableA join tableB on tableA.key = tableB.key; 这两个表的历史数据在 flink 中存在哪里?存多久呢? 比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? 谢谢, 王磊 wangl...@geekplus.com.cn
flinkSQL join 表的历史信息保存在哪里保存多久
两个从 kafka 创建的表: tableA: key valueA tableB: key valueB 用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from tableA join tableB on tableA.key = tableB.key; 这两个表的历史数据在 flink 中存在哪里?存多久呢? 比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
我试了下,是可以的。 Thanks wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 19:59 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 那有可能是可以的,你可以试试看 Best, Kurt On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn wrote: Hi Kurt, 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗? 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 存储并且再次提交任务可以被访问到直接用吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 12:54 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? sql client 目前还不支持这个功能。 Best, Kurt On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn wrote: Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > 谢谢, > 王磊 >
Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Thanks Jark, No word to express my '囧'. wangl...@geekplus.com.cn Sender: Jark Wu Send Time: 2020-03-11 18:32 Receiver: wangl...@geekplus.com.cn cc: user; user-zh Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 Hi Lei, The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong field name in the DDL. It should be "input_date", not "intput_date". Best, Jark On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn wrote: Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
Hi Kurt, 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗? 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 存储并且再次提交任务可以被访问到直接用吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 12:54 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? sql client 目前还不支持这个功能。 Best, Kurt On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn wrote: Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > 谢谢, > 王磊 >
回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > 谢谢, > 王磊 >
flink sql join 可以有 state 存储并从 state 恢复数据吗?
有两个表: tableA: key valueA tableB: key valueB 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? 谢谢, 王磊
join key 有重复的双流 join 怎样去重后发送到 kafka
有两个 kafka 作为数据源的表 order_info: order_no info order_status: order_no status 两个表的 order_no 都会有重复,来一条其中一个表的记录,会在另外一个表中找到多条记录。 我怎样实现在另外一个表中只取出与该 join key 相关的最新的一条记录并发送到 kafka 中呢? kafka 只支持 append 模式的 sink,先把 表 group 再join 行不通。 谢谢, 王磊
Re: Re: Flink State 过期清除 TTL 问题
Hi 唐云, 我的集群已经升到了 1.8.2, cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter 都试验了下。 但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的: cancel -s 停止,savepoint 目录大小为 100M 代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot 新的代码从 1 的 savepoint 目录恢复 新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大 会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Yun Tang Send Time: 2019-11-01 01:38 Receiver: user-zh@flink.apache.org Subject: Re: Flink State 过期清除 TTL 问题 Hi 王磊 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 cleanupFullSnapshot,这样你在执行full snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state [2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv 祝好 唐云 On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn" wrote: flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); ValueStateDescriptor descriptor = new ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class)); descriptor.enableTimeToLive(ttlConfig); 程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。 我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗? 谢谢, 王磊 wangl...@geekplus.com.cn
state TTL 变更问题
有一个程序用到了 state, 设置 TTL 为3天。 运行一段时间后 cancel -s 停止,把过期时间设为 7 天,再从 state 文件恢复运行。 cancel -s 停止时生成的文件里面的所有 key,TTL 都会变成 7 天吗? 还是依然是 3 天? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: 怎样把 state 定时写到外部存储
Hi Congxian, 以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。 我们的 case 是写到 MySQL 中 wangl...@geekplus.com.cn Sender: Congxian Qiu Send Time: 2019-11-01 10:10 Receiver: user-zh Subject: Re: 怎样把 state 定时写到外部存储 好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢? Best, Congxian Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道: > 是否可以注册一个定时器? > > > 你看看这个文章,是否对你有帮助 > > > https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA > 在2019年10月31日 10:16,wangl...@geekplus.com.cn 写道: > > > 消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。 > 有没有什么方式可以定期读 state 写到外部存储? > 我现在用的是 Flink1.7.2 版本。 > > > > > > wangl...@geekplus.com.cn
Re: Re: Flink State 过期清除 TTL 问题
谢谢,了解了。 王磊 wangl...@geekplus.com.cn Sender: Yun Tang Send Time: 2019-11-01 01:38 Receiver: user-zh@flink.apache.org Subject: Re: Flink State 过期清除 TTL 问题 Hi 王磊 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 cleanupFullSnapshot,这样你在执行full snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state [2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv 祝好 唐云 On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn" wrote: flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); ValueStateDescriptor descriptor = new ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class)); descriptor.enableTimeToLive(ttlConfig); 程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。 我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Flink State 过期清除 TTL 问题
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); ValueStateDescriptor descriptor = new ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class)); descriptor.enableTimeToLive(ttlConfig); 程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。 我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗? 谢谢, 王磊 wangl...@geekplus.com.cn
怎样把 state 定时写到外部存储
消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。 有没有什么方式可以定期读 state 写到外部存储? 我现在用的是 Flink1.7.2 版本。 wangl...@geekplus.com.cn
Re: Re: CsvTableSink 目录没有写入具体的数据
抱歉,是我搞错了。 实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。 wangl...@geekplus.com.cn Sender: Alec Chen Send Time: 2019-08-09 10:17 Receiver: user-zh Subject: Re: Re: CsvTableSink 目录没有写入具体的数据 没数据是因为没有trigger执行, 参考sample code from doc( https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html ) // get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a TableSinkTableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); // register the TableSink with a specific schemaString[] fieldNames = {"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); // compute a result Table using Table API operators and/or SQL queriesTable result = ...// emit the result Table to the registered TableSinkresult.insertInto("CsvSinkTable"); // execute the program 加上 tableEnv.execute(); wangl...@geekplus.com.cn 于2019年8月9日周五 上午9:42写道: > > 我接入了一个 RocketMQ 的流作为输入。 > > > DataStream> ds = env.addSource(new > RocketMQSource( > > System.out.println(res); > return res; > } > }); > > > tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, > pick_list_no, sku_code"); > > TableSink csvSink = new CsvTableSink("D:\\data\\flink",","); > String[] fieldNames = {"num"}; > TypeInformation[] fieldTypes = {Types.INT}; > tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, > csvSink); > tableEnv.sqlUpdate( > "INSERT INTO RubberOrders SELECT pick_task_id FROM > t_pick_task"); > > > > wangl...@geekplus.com.cn > > Sender: Alec Chen > Send Time: 2019-08-08 21:01 > Receiver: user-zh > Subject: Re: CsvTableSink 目录没有写入具体的数据 > 完整代码发一下 > > wangl...@geekplus.com.cn 于2019年8月8日周四 下午7:37写道: > > > > > 我按官网上的 > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query > > 例子写的代码 > > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? > > > > > > > > wangl...@geekplus.com.cn > > >
Re: Re: CsvTableSink 目录没有写入具体的数据
我接入了一个 RocketMQ 的流作为输入。 DataStream> ds = env.addSource(new RocketMQSource( System.out.println(res); return res; } }); tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, pick_list_no, sku_code"); TableSink csvSink = new CsvTableSink("D:\\data\\flink",","); String[] fieldNames = {"num"}; TypeInformation[] fieldTypes = {Types.INT}; tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink); tableEnv.sqlUpdate( "INSERT INTO RubberOrders SELECT pick_task_id FROM t_pick_task"); wangl...@geekplus.com.cn Sender: Alec Chen Send Time: 2019-08-08 21:01 Receiver: user-zh Subject: Re: CsvTableSink 目录没有写入具体的数据 完整代码发一下 wangl...@geekplus.com.cn 于2019年8月8日周四 下午7:37写道: > > 我按官网上的 > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query > 例子写的代码 > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? > > > > wangl...@geekplus.com.cn >
CsvTableSink 目录没有写入具体的数据
我按官网上的 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query 例子写的代码 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? wangl...@geekplus.com.cn