flink-1.11 在 windows 下怎样启动

2020-07-23 文章 wangl...@geekplus.com.cn

我看 flink-1.11 发布包 bin 目录没有 windows 启动所需的 .bat 文件了。
那在 windows 下怎样启动呢?

谢谢
王磊




wangl...@geekplus.com.cn 


flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka

2020-07-16 文章 wangl...@geekplus.com.cn

 INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) from 
kafka_ods_artemis_out_order group by warehouse_id;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 
'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming update 
changes which is produced by node GroupAggregate(groupBy=[warehouse_id], 
select=[warehouse_id, COUNT(*) AS EXPR$1])

在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
 
我看现在 Flink-1.11 中是用了  KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 GroupBy 
的结果也发送到 Kafka 呢?

谢谢,
王磊 


wangl...@geekplus.com.cn 



Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 文章 wangl...@geekplus.com.cn

 谢谢,我理解了。



wangl...@geekplus.com.cn 

Sender: Harold.Miao
Send Time: 2020-07-16 19:33
Receiver: user-zh
Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码
 
private static  T findSingleInternal(
  Class factoryClass,
  Map properties,
  Optional classLoader) {
 
   List tableFactories = discoverFactories(classLoader);
   List filtered = filter(tableFactories, factoryClass, properties);
 
   if (filtered.size() > 1) {
  throw new AmbiguousTableFactoryException(
 filtered,
 factoryClass,
 tableFactories,
 properties);
   } else {
  return filtered.get(0);
   }
}
 
private static List
discoverFactories(Optional classLoader) {
   try {
  List result = new LinkedList<>();
  ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
  ServiceLoader
 .load(TableFactory.class, cl)
 .iterator()
 .forEachRemaining(result::add);
  return result;
   } catch (ServiceConfigurationError e) {
  LOG.error("Could not load service provider for table factories.", e);
  throw new TableException("Could not load service provider for
table factories.", e);
   }
 
}
 
 
wangl...@geekplus.com.cn  于2020年7月16日周四 下午7:04写道:
 
>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>
 
 
-- 
 
Best Regards,
Harold Miao


Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 文章 wangl...@geekplus.com.cn

我在 
flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 找到了 SPI 的配置:

org.apache.flink.formats.json.JsonFileSystemFormatFactory
org.apache.flink.formats.json.JsonFormatFactory
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
org.apache.flink.formats.json.canal.CanalJsonFormatFactory

还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 
代码没找到类似的关系映射配置。


谢谢,
王磊



wangl...@geekplus.com.cn 

 
Sender: godfrey he
Send Time: 2020-07-16 16:38
Receiver: user-zh
Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
 
Best,
Godfrey
 
wangl...@geekplus.com.cn  于2020年7月16日周四 下午4:02写道:
 
> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-16 文章 wangl...@geekplus.com.cn

直接在 flink-conf.yaml 文件中加配置
execution.checkpointing.interval: 6




wangl...@geekplus.com.cn 

 
Sender: Harold.Miao
Send Time: 2020-07-16 13:27
Receiver: user-zh
Subject: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval
hi flink users
 
通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
谢谢
 
 
 
-- 
 
Best Regards,
Harold Miao


FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 文章 wangl...@geekplus.com.cn
比如:

CREATE TABLE my_table (
  id BIGINT,
 first_name STRING,
 last_name STRING,
 email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);

最终解析 debezium-json 应该是  
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 
下面的代码
但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?

谢谢,
王磊


wangl...@geekplus.com.cn 



回复: FlinkSQL 入到 MySQL后汉字乱码

2020-07-15 文章 wangl...@geekplus.com.cn

是 MySQL_tableB 所在的 server 端字符设置有问题。
配置中加上下面的配置就好了。


[mysqld] character-set-server=utf8 [client] default-character-set=utf8 [mysql] 
default-character-set=utf8




wangl...@geekplus.com.cn 

发件人: wangl...@geekplus.com.cn
发送时间: 2020-07-15 16:34
收件人: user-zh
主题: FlinkSQL 入到 MySQL后汉字乱码
 
KafkaTable:kafka 消息
MySQL_tableA:  维表,维表里 value 是汉字
MySQL_tableB:  join后的结果表。和 MySQL_tableA 不在同一台服务器上。
 
我直接在 flink sql client   SELECT 是可以正常显示, 但 INSERT INTO MySQL_tableB SELECT 后到 
MySQL_tableB 里去查看,汉字就乱码了。
大家有什么建议吗?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 


Re: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn

谢谢,根本原因就是  flink sql-client 客户端默认没有设置 checkpoint 导致的。



wangl...@geekplus.com.cn 

 
Sender: Rui Li
Send Time: 2020-07-14 18:29
Receiver: user-zh
cc: Leonard Xu; 夏帅
Subject: Re: Re: 不能实时读取实时写入到 Hive 的数据
流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL
client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint
 
On Tue, Jul 14, 2020 at 5:49 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 我把问题简化一下,创建 Hive 表时不带任何参数
>
> CREATE TABLE hive_ods_wms_pick_order (
>   order_no STRING,
>   status INT,
>   dispatch_time TIMESTAMP
> )  STORED AS parquet
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time FROM kafka_ods_wms_pick_order;
>
> 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
> 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM
> kafka_ods_wms_pick_order 确实是有数据返回的。
>
> 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
> 是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
> 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?
>
> 谢谢,
> 王磊
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 17:20
> 收件人: user-zh; 夏帅
> 抄送: wangl...@geekplus.com.cn
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
>
> Hi, wanglei
>
> 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive
> 的分区已完成信息(通过metastore或success文件).
>
> 你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置
>
> 祝好,
> Leonard Xu
>
>
> 在 2020年7月14日,16:59,夏帅  写道:
>
> 你好,
> 可以参考下这个问题的解决
>
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
>
>
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:50
> 收件人:user-zh ; 夏帅 ;
> Leonard Xu 
> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
>
>
> 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>
>
> CREATE TABLE kafka_ods_wms_pick_order (
> order_no STRING,
> status INT,
> dispatch_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ods_wms_pick_order',
> 'properties.bootstrap.servers' = ':9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
> CREATE TABLE hive_ods_wms_pick_order (
>  order_no STRING,
>  status INT,
>  dispatch_time TIMESTAMP
> ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 h',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'),
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> SELECT * FROM hive_ods_wms_pick_order /*+
> OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-07-24') */;
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: 夏帅
> Send Time: 2020-07-14 16:43
> Receiver: user-zh; xbjtdcq
> Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> 你好,
> 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:40
> 收件人:user-zh ; xbjtdcq 
> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> 我加上了这个 tablehint 。
> 任务提交上去了,但客户端还是没有任何返回显示。
> 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> 谢谢,
> 王磊
> wangl...@geekplus.com.cn
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 16:17
> 收件人: user-zh
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
> HI, wanglei
> 你开启了 streaming-source.enable
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints
> 方便地指定参数。
> SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> 祝好,
> Leonard Xu
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink
> webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
>
 
-- 
Best regards!
Rui Li


回复: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn
我把问题简化一下,创建 Hive 表时不带任何参数

CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
)  STORED AS parquet

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time FROM kafka_ods_wms_pick_order;

我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
我在 flink 客户端 SELECT order_no, status, dispatch_time FROM 
kafka_ods_wms_pick_order 确实是有数据返回的。

我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?

谢谢,
王磊




wangl...@geekplus.com.cn 

 
发件人: Leonard Xu
发送时间: 2020-07-14 17:20
收件人: user-zh; 夏帅
抄送: wangl...@geekplus.com.cn
主题: Re: 不能实时读取实时写入到 Hive 的数据

Hi, wanglei

这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 
的分区已完成信息(通过metastore或success文件).

你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置

祝好,
Leonard Xu


在 2020年7月14日,16:59,夏帅  写道:

你好,
可以参考下这个问题的解决
http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html


--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:50
收件人:user-zh ; 夏帅 ; Leonard Xu 

主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据


应该是我没有理解 partitiion-commit 
的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
order_no STRING,
status INT,
dispatch_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'ods_wms_pick_order',
'properties.bootstrap.servers' = ':9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-07-24') */;




wangl...@geekplus.com.cn


Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
谢谢,
王磊
wangl...@geekplus.com.cn 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
祝好,
Leonard Xu
在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:


试验了一下 Flink-1.11  hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
页面观察 这个 select * from hive_table 的 job 已经结束了。

谢谢,
王磊



wangl...@geekplus.com.cn 




Re: 回复: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn
应该是我没有理解 partitiion-commit 
的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'ods_wms_pick_order',
 'properties.bootstrap.servers' = ':9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-07-24') */;




wangl...@geekplus.com.cn 

 
Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
 
 
--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
 
 
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 
 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
 
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
 
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
 
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
 
祝好,
Leonard Xu
 
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 
 


回复: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn

我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 

谢谢,
王磊



wangl...@geekplus.com.cn 

发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
 
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
 
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
 
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
 
祝好,
Leonard Xu
 
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 
 


不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn

试验了一下 Flink-1.11  hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
页面观察 这个 select * from hive_table 的 job 已经结束了。

谢谢,
王磊



wangl...@geekplus.com.cn 



Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 wangl...@geekplus.com.cn
public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());

如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics.
但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。

谢谢,
王磊


wangl...@geekplus.com.cn 

 
Sender: kcz
Send Time: 2020-07-03 09:13
Receiver: wanglei2
Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧



-- 原始邮件 --
发件人: 王磊2 
发送时间: 2020年7月2日 21:46
收件人: user-zh , 17610775726 <17610775...@163.com>
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?


没有明白你说的实现方式。

我最终要得到类似的 Metrics:  myCounter_table1, myCounter_table2, ..., myCounter_tableX
但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。

谢谢,
王磊



--
发件人:JasonLee <17610775...@163.com>
发送时间:2020年7月2日(星期四) 21:12
收件人:user-zh 
主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

你把tablename传到下面metric里不就行了吗


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道:

全都是同一种类型的 metrics.
比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 
meter 类型)

谢谢,
王磊




wangl...@geekplus.com.cn


发件人: JasonLee
发送时间: 2020-07-02 16:16
收件人: user-zh
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ?


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:

官网上的例子:

public class MyMapper extends RichMapFunction {
private transient Counter counter;
@Override
public void open(Configuration config) {
  this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
  this.counter.inc();
  return value;
}
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn




回复: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 wangl...@geekplus.com.cn

全都是同一种类型的 metrics.
比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 
meter 类型) 

谢谢,
王磊




wangl...@geekplus.com.cn 


发件人: JasonLee
发送时间: 2020-07-02 16:16
收件人: user-zh
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ?
 
 
| |
JasonLee
|
|
邮箱:17610775...@163.com
|
 
Signature is customized by Netease Mail Master
 
在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:
 
官网上的例子:
 
public class MyMapper extends RichMapFunction {
private transient Counter counter;
@Override
public void open(Configuration config) {
   this.counter = getRuntimeContext()
 .getMetricGroup()
 .counter("myCounter");
}
@Override
public String map(String value) throws Exception {
   this.counter.inc();
   return value;
}
}
 
我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn
 


在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 wangl...@geekplus.com.cn

官网上的例子:

public class MyMapper extends RichMapFunction {
  private transient Counter counter;
  @Override
  public void open(Configuration config) {
this.counter = getRuntimeContext()
  .getMetricGroup()
  .counter("myCounter");
  }
  @Override
  public String map(String value) throws Exception {
this.counter.inc();
return value;
  }
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn



????: ??????Flink State ?????????? state ????????????

2020-06-09 文章 wangl...@geekplus.com.cn

OrderState 
 get set ?? 

 flink ??OrderState??



wang...@geekplus.com.cn

 1048262223
?? 2020-06-09 18:11
 user-zh
?? ??Flink State ?? state 
Hi
 
 
flink??OrderStatepojo??savepoint??
 
Best,
Yichao Yang
 
 
 
 
----
??:"wangl...@geekplus.com.cn"

Flink State 增加字段后 state 还能识别吗?

2020-06-09 文章 wangl...@geekplus.com.cn

写了个简单的类会在 Flink State 中使用:

public class OrderState {
private Integer warehouseId;
private String orderNo;
private String ownerCode;
private Long inputDate;
private int orderType;
private int amount = 0;
private int status = 0;
.
}


现在程序要升级,这个类还要增加一个新的字段。从state 能正常恢复吗?
也就是 flink run -s   savepointdir   后能正常识别旧的代码保存的 state 吗?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群

2020-05-07 文章 wangl...@geekplus.com.cn

我试了下是可以的,但现在有一个访问 HDFS 的问题。

我用的 hadoop 是阿里云 EMR 管理, 在 EMR 管理的机器上可以以 hdfs://emr-cluster:8020/ 访问 HDFS

但我部署的 Flink 不属于 EMR 管理,这个地址是不能解析的,我只能写成  hdfs://active-namenode-ip:8020/ 
的形式,NameNode 丧失了 HA 的功能

有什么方式解决这个问题吗?

谢谢,
王磊



wangl...@geekplus.com.cn

Sender: Andrew
Send Time: 2020-05-07 12:31
Receiver: user-zh
Subject: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
 
 
 
 
---原始邮件---
发件人: "wangl...@geekplus.com.cn"

回复: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群

2020-05-06 文章 wangl...@geekplus.com.cn

看起来这个文档可以,我先试下:

https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html



wangl...@geekplus.com.cn

发件人: wangl...@geekplus.com.cn
发送时间: 2020-05-07 12:23
收件人: user-zh
主题: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群
 
现在已经有了一个 Hadoop 集群。
我想在这个 集群外(不同的机器,网络互通)部署一个 standalone 模式的 flink cluster,配置 jobmanager 的 
HA,访问和使用已有 Hadoop 的 HDFS,使用已有 Hadoop 的 zookeeper 
 
这个有参考文档怎么操作吗?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 
 


Re: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 文章 wangl...@geekplus.com.cn

Thanks  Jingsong Lee.

我用的是 MySQL,sink 表中没有任何主键或唯一键.
如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。

我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 
kafka 消息导致的行为:

 +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - 
yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1

第1条消息:执行一个 INSERT 
第2条消息:执行了 一个 DELETE, 一个  INSERT 
第3条消息:执行了一个  INSERT ON DUPLICATE UPDATE
第4条消息:执行了两个  INSERT ON DUPLICATE UPDATE


我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是INSERT ON DUPLICATE UPDATE

不知道我这样理解是否正确。

谢谢,
王磊




wangl...@geekplus.com.cn

Sender: Jingsong Li
Send Time: 2020-05-06 11:35
Receiver: user-zh
Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
Hi,
 
问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
 
问题二:正确数据应该是:
1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1  ( 删除
zhongtong 1)
3  {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2  (
删除yuantong 1)
4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
zhongtong 1( 删除yuantong 2)
 
你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
 
Best,
Jingsong Lee
 
On Wed, May 6, 2020 at 10:36 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> wangl...@geekplus.com.cn
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" 
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> order_cnt from
> >(select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> >
> >2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3  {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1,
> yuantong 2  (增加了条记录,没有删除)
> >
> >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除)
> >
> >
> >问题一:
> >第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >wangl...@geekplus.com.cn
>
 
 
-- 
Best, Jingsong Lee


FlinkSQL Retraction 问题原理咨询

2020-04-30 文章 wangl...@geekplus.com.cn

自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 
表没有主键,也没有唯一键。

INSERT INTO table_out select  tms_company,  count(distinct order_id) as 
order_cnt from 
(select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table 
group by order_id) 
 group by tms_company;


总共发送了 4 条消息,顺序如下:

1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1

2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1  (上一条记录被删除了)

3  {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2  
(增加了条记录,没有删除)

4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1, yuantong 2, 
yuantong 1, zhongtong 1(增加了两条记录,没有删除)


问题一:
第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?

问题二:
   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?

谢谢,
王磊



wangl...@geekplus.com.cn 


回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-27 文章 wangl...@geekplus.com.cn

Thanks Leonard, 

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON DUPLICATE KEY 
吗? 
这个在源代码哪个地方呢?

谢谢,
王磊



wangl...@geekplus.com.cn 

 
发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei
 
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, 
RetractStreamSink)
 
> 我看 
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
>  没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。
 
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
 
不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可 
 
Best,
 
Leonard Xu


FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-26 文章 wangl...@geekplus.com.cn


INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1

每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。
但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 
RetractStream 呢?

我看 
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?



如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?




wangl...@geekplus.com.cn 



Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn

我只保留 KafkaRetractTableSourceSinkFactory  一个, KafkaRetractTableSinkBase 实现 
RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。
@Override
public DataStreamSink consumeDataStream(DataStream> 
dataStream) {
   DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x -> 
x.f1);

INSERT INTO table1 SELCET field, count(*) from table2 group by field 这是 一个 
RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。
INSERT INTO table1 SELECT feild, 1 from table2  
  我理解这不是一个 RetractStream, 上面  dataStream.filter(x -> x.f0 == 
Boolean.TRUE) 的代码应该会出错,但实际上没有出错

还不是完全能理解,我再看一下吧。

谢谢,
王磊


wangl...@geekplus.com.cn 

Sender: Benchao Li
Send Time: 2020-03-31 12:02
Receiver: user-zh
Subject: Re: Re: 实现 KafkaUpsertTableSink
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方,
然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的?
 
wangl...@geekplus.com.cn  于2020年3月31日周二 上午11:17写道:
 
> 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成
> KafkaRetractTableSourceSinkFactory 写了一遍
> 但这个应该怎样改才合适呢?
>
> 137 private static  T
> findSingleInternal(
> 138 Class factoryClass,
> 139 Map properties,
> 140 Optional classLoader) {
> 141
> 142 List tableFactories =
> discoverFactories(classLoader);
> 143 List filtered = filter(tableFactories,
> factoryClass, properties);
> 144
> 145 if (filtered.size() > 1) {
> 146 throw new AmbiguousTableFactoryException(
> 147 filtered,
> 148 factoryClass,
> 149 tableFactories,
> 150 properties);
> 151 } else {
> 152 return filtered.get(0);
> 153 }
> 154 }
>
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: wangl...@geekplus.com.cn
> Send Time: 2020-03-31 10:50
> Receiver: user-zh
> Subject: Re: RE: 实现 KafkaUpsertTableSink
>
> 我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
>
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
> ... 3 more
>
> 这个改怎样解决呢?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: wxchunj...@163.com
> Send Time: 2020-03-29 10:32
> Receiver: user-zh@flink.apache.org
> Subject: RE: 实现 KafkaUpsertTableSink
> Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
> -Original Message-
> From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org
>  On Behalf Of
> Benchao Li
> Sent: Saturday, March 28, 2020 6:28 PM
> To: user-zh 
> Subject: Re: 实现 KafkaUpsertTableSink
> Hi,
> 你需要把你新增的Factory添加到 resources下的
>
> META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
>  于2020年3月28日周六 下午5:38写道:
> > 各位大佬:
> >
> > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> > KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> > KafkaUpsertTableSink:
> >
> > KafkaUpsertTableSink
> >
> > KafkaUpsertTableSinkBase
> >
> > KafkaUpsertTableSourceSinkFactory
> >
> > KafkaUpsertTableSourceSinkFactoryBase
> >
> > MyKafkaValidator
> >
> > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> > 呢?
> >

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 
KafkaRetractTableSourceSinkFactory 写了一遍
但这个应该怎样改才合适呢?

137 private static  T findSingleInternal(
138 Class factoryClass,
139 Map properties,
140 Optional classLoader) {
141
142 List tableFactories = 
discoverFactories(classLoader);
143 List filtered = filter(tableFactories, factoryClass, 
properties);
144
145 if (filtered.size() > 1) {
146 throw new AmbiguousTableFactoryException(
147 filtered,
148 factoryClass,
149 tableFactories,
150 properties);
151 } else {
152 return filtered.get(0);
153 }
154 }


谢谢,
王磊


wangl...@geekplus.com.cn 

 
Sender: wangl...@geekplus.com.cn
Send Time: 2020-03-31 10:50
Receiver: user-zh
Subject: Re: RE: 实现 KafkaUpsertTableSink
 
我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
 
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
... 3 more
 
这个改怎样解决呢?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 
 
Sender: wxchunj...@163.com
Send Time: 2020-03-29 10:32
Receiver: user-zh@flink.apache.org
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
-Original Message-
From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org 
 On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh 
Subject: Re: 实现 KafkaUpsertTableSink
Hi,
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
 于2020年3月28日周六 下午5:38写道:
> 各位大佬:
>
> 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
> KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List discoverFactories(Optional
> classLoader) {
>try {
>   List result = new LinkedList<>();
>   ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>   ServiceLoader
>  .load(TableFactory.class, cl)
>  .iterator()
>  .forEachRemaining(result::add);
>   //todo add
>   result.add(new KafkaUpsertTableSourceSinkFactory());
>   return result;
>} catch (ServiceConfigurationError e) {
>   LOG.error("Could not load service provider for table factories.", e);
>   throw new TableException("Could not load service provider for 
> table factories.", e);
>}
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> --
>
> Thanks
>
> venn
>
>
>
>
-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: RE: 实现 KafkaUpsertTableSink

2020-03-30 文章 wangl...@geekplus.com.cn

我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:

org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
... 3 more

这个改怎样解决呢?

谢谢,
王磊



wangl...@geekplus.com.cn 

 
Sender: wxchunj...@163.com
Send Time: 2020-03-29 10:32
Receiver: user-zh@flink.apache.org
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
 
 
-Original Message-
From: user-zh-return-2640-wxchunjhyy=163@flink.apache.org 
 On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh 
Subject: Re: 实现 KafkaUpsertTableSink
 
Hi,
 
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
 
 于2020年3月28日周六 下午5:38写道:
 
> 各位大佬:
>
> 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
> KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List discoverFactories(Optional
> classLoader) {
>try {
>   List result = new LinkedList<>();
>   ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>   ServiceLoader
>  .load(TableFactory.class, cl)
>  .iterator()
>  .forEachRemaining(result::add);
>   //todo add
>   result.add(new KafkaUpsertTableSourceSinkFactory());
>   return result;
>} catch (ServiceConfigurationError e) {
>   LOG.error("Could not load service provider for table factories.", e);
>   throw new TableException("Could not load service provider for 
> table factories.", e);
>}
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> --
>
> Thanks
>
> venn
>
>
>
>
 
-- 
 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 wangl...@geekplus.com.cn

flink-table-uber-blink 下
 mvn clean install -DskipTests -Dscala-2.12 -DskipTests

不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的

谢谢, 
王磊


wangl...@geekplus.com.cn 
 
Sender: Kurt Young
Send Time: 2020-03-26 18:15
Receiver: user-zh
cc: jihongchao
Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
 
Best,
Kurt
 
 
On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
>
> 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> 这个 jar 是从哪里 build 出来的呢?
>
> 我 clone github 上的源代码,mvn clean package
> 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
>  flink-table-blink_2.12-1.10.0.jar  是对应的
> 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 wangl...@geekplus.com.cn

单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar 
这个 jar 是从哪里 build 出来的呢?
 
我 clone github 上的源代码,mvn clean package
我以为 flink-table/flink-table-planner-blink 目录下build 出的 
flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的   
flink-table-blink_2.12-1.10.0.jar  是对应的
我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。

谢谢,
王磊



wangl...@geekplus.com.cn 



回复: Flink JDBC Driver是否支持创建流数据表

2020-03-24 文章 wangl...@geekplus.com.cn

参考下这个文档: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
下面的语法应该是不支持的:
  'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n"

下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
+ "order_no VARCHAR,\n"
+ "status INT\n"
+ ") WITH (\n"
+ "'connector.type' = 'kafka',\n"
+ "'connector.version' = 'universal',\n"
+ "'connector.topic' = 'wanglei_test',\n"
+ "'connector.startup-mode' = 'latest-offset',\n"
+ "'connector.properties.0.key' = 'zookeeper.connect',\n"
+ "'connector.properties.0.value' = 'xxx:2181',\n"
+ "'connector.properties.1.key' = 'bootstrap.servers',\n"
+ "'connector.properties.1.value' = 'xxx:9092',\n"
    + "'update-mode' = 'append',\n"
+ "'format.type' = 'json',\n"
+ "'format.derive-schema' = 'true'\n"
+ ")");

王磊


wangl...@geekplus.com.cn
 
发件人: 赵峰
发送时间: 2020-03-24 21:28
收件人: user-zh
主题: Flink JDBC Driver是否支持创建流数据表
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = 
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
 
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal', \n" +
"'connector.topic' = 'flink_im02', \n" +
"'connector.properties.group.id' = 'flink_im02_new',\n" +
"'connector.startup-mode' = 'earliest-offset', \n" +
"'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"'format.type' = 'csv',\n" +
"'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 
 
statement.close();
connection.close();
 
报错:
Reason: Required context properties mismatch.
 
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
 
 
 
 
赵峰


Re: Re: flinkSQL join表的历史信息保存在哪里保存多久

2020-03-12 文章 wangl...@geekplus.com.cn

Thanks, it works.


wangl...@geekplus.com.cn
 
Sender: sunfulin
Send Time: 2020-03-12 14:19
Receiver: user-zh; wanglei2
cc: jinhai.me
Subject: Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久


这样来用:
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);




在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn"  写道:
>
>这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。
>StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>StreamQueryConfig qConfig = tableEnv.queryConfig();
>
>
>
>wangl...@geekplus.com.cn 
>
> 
>Sender: jinhai wang
>Send Time: 2020-03-12 13:44
>Receiver: user-zh@flink.apache.org
>Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
>应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
> 
> 
>在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
> 
>
>两个从 kafka 创建的表:
>
>tableA: key  valueA
>tableB: key  valueB 
>
>用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
> tableA join tableB on tableA.key = tableB.key;
>这两个表的历史数据在 flink 中存在哪里?存多久呢?
>
>比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
> 
>谢谢,
>王磊
>
>
>
>wangl...@geekplus.com.cn 
>
> 



 


Re: Re: flinkSQL join表的历史信息保存在哪里保存多久

2020-03-12 文章 wangl...@geekplus.com.cn

这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。
StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
StreamQueryConfig qConfig = tableEnv.queryConfig();



wangl...@geekplus.com.cn 

 
Sender: jinhai wang
Send Time: 2020-03-12 13:44
Receiver: user-zh@flink.apache.org
Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
 
 
在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
 

两个从 kafka 创建的表:

tableA: key  valueA
tableB: key  valueB 

用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?

比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
 
谢谢,
王磊



wangl...@geekplus.com.cn 

 


flinkSQL join 表的历史信息保存在哪里保存多久

2020-03-11 文章 wangl...@geekplus.com.cn

两个从 kafka 创建的表:

tableA: key  valueA
tableB: key  valueB 

用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?

比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
 
谢谢,
王磊



wangl...@geekplus.com.cn 


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 wangl...@geekplus.com.cn
我试了下,是可以的。

Thanks


wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 19:59
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
那有可能是可以的,你可以试试看

Best,
Kurt


On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt,

如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗?
代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 
存储并且再次提交任务可以被访问到直接用吗?

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 12:54
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 文章 wangl...@geekplus.com.cn

Thanks Jark,

No word to express my '囧'.


wangl...@geekplus.com.cn 

Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei, 

The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong 
field name in the DDL. 
It should be "input_date", not "intput_date".

Best,
Jark

On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn 
 wrote:

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 wangl...@geekplus.com.cn
Hi Kurt,

如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗?
代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 
存储并且再次提交任务可以被访问到直接用吗?

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 12:54
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 文章 wangl...@geekplus.com.cn

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 文章 wangl...@geekplus.com.cn
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 文章 wangl...@geekplus.com.cn
有两个表:
tableA: key  valueA
tableB: key  valueB

我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?

谢谢,
王磊


join key 有重复的双流 join 怎样去重后发送到 kafka

2020-03-10 文章 wangl...@geekplus.com.cn
有两个 kafka 作为数据源的表 
order_info:
  order_no   info
order_status: 
  order_no  status

两个表的 order_no 都会有重复,来一条其中一个表的记录,会在另外一个表中找到多条记录。
我怎样实现在另外一个表中只取出与该 join key 相关的最新的一条记录并发送到 kafka 中呢?

kafka 只支持 append 模式的 sink,先把 表 group 再join 行不通。

谢谢,
王磊




Re: Re: Flink State 过期清除 TTL 问题

2019-12-09 文章 wangl...@geekplus.com.cn
Hi 唐云,

我的集群已经升到了 1.8.2,  cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter 都试验了下。
但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的:

cancel -s 停止,savepoint 目录大小为 100M
代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot 
新的代码从 1 的 savepoint 目录恢复
新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大

会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间? 

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Yun Tang
Send Time: 2019-11-01 01:38
Receiver: user-zh@flink.apache.org
Subject: Re: Flink State 过期清除 TTL 问题
Hi 王磊
 
从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
 
另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
 
祝好
唐云
 
 
On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:
 
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊

    

wangl...@geekplus.com.cn

 


state TTL 变更问题

2019-11-15 文章 wangl...@geekplus.com.cn

有一个程序用到了 state, 设置 TTL 为3天。
运行一段时间后 cancel -s 停止,把过期时间设为 7 天,再从 state 文件恢复运行。

cancel -s 停止时生成的文件里面的所有 key,TTL 都会变成 7 天吗? 还是依然是 3 天?

谢谢,
王磊 





wangl...@geekplus.com.cn


Re: Re: 怎样把 state 定时写到外部存储

2019-10-31 文章 wangl...@geekplus.com.cn
Hi Congxian,

以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。
我们的 case 是写到 MySQL 中



wangl...@geekplus.com.cn
 
Sender: Congxian Qiu
Send Time: 2019-11-01 10:10
Receiver: user-zh
Subject: Re: 怎样把 state 定时写到外部存储
好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢?
 
Best,
Congxian
 
 
Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道:
 
> 是否可以注册一个定时器?
>
>
> 你看看这个文章,是否对你有帮助
>
>
> https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
>  在2019年10月31日 10:16,wangl...@geekplus.com.cn 写道:
>
>
> 消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
> 有没有什么方式可以定期读 state 写到外部存储?
> 我现在用的是 Flink1.7.2 版本。
>
>
>
>
>
> wangl...@geekplus.com.cn


Re: Re: Flink State 过期清除 TTL 问题

2019-10-31 文章 wangl...@geekplus.com.cn
谢谢,了解了。

王磊



wangl...@geekplus.com.cn
 
Sender: Yun Tang
Send Time: 2019-11-01 01:38
Receiver: user-zh@flink.apache.org
Subject: Re: Flink State 过期清除 TTL 问题
Hi 王磊
 
从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
 
另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
 
祝好
唐云
 
 
On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:
 
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊

    

wangl...@geekplus.com.cn

 


Flink State 过期清除 TTL 问题

2019-10-30 文章 wangl...@geekplus.com.cn
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn


怎样把 state 定时写到外部存储

2019-10-30 文章 wangl...@geekplus.com.cn

消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
有没有什么方式可以定期读 state 写到外部存储? 
我现在用的是 Flink1.7.2 版本。





wangl...@geekplus.com.cn


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 wangl...@geekplus.com.cn

抱歉,是我搞错了。
实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-09 10:17
Receiver: user-zh
Subject: Re: Re: CsvTableSink 目录没有写入具体的数据
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)
 
// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program
 
加上 tableEnv.execute();
 
 
wangl...@geekplus.com.cn  于2019年8月9日周五 上午9:42写道:
 
>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream> ds = env.addSource(new
> RocketMQSource(
> 
> System.out.println(res);
> return res;
> }
> });
>
>
> tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
> TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
> String[] fieldNames = {"num"};
> TypeInformation[] fieldTypes = {Types.INT};
> tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
> tableEnv.sqlUpdate(
> "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
>


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 wangl...@geekplus.com.cn
   
我接入了一个 RocketMQ 的流作为输入。


 DataStream> ds = env.addSource(new 
RocketMQSource(

System.out.println(res);
return res;
}
});


tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, 
pick_list_no, sku_code");

TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
String[] fieldNames = {"num"};
TypeInformation[] fieldTypes = {Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, 
csvSink);
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT pick_task_id FROM 
t_pick_task");



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-08 21:01
Receiver: user-zh
Subject: Re: CsvTableSink 目录没有写入具体的数据
完整代码发一下
 
wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
 
>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>


CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 wangl...@geekplus.com.cn

我按官网上的 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
  例子写的代码
但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?



wangl...@geekplus.com.cn