回复: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2020-01-03 文章 aven . wu
Hi Jingsong
感谢指点,使用DataStream 解决了我目前的问题。
对于RowTypeInfo的设置可能有些隐晦(指在创建Datastream时就需要指定)。
希望之后对tableenv.registerStream 
API能有更好更直接的方式来设置RowTypeInfo以及一些相关可能的信息。(包括注册Datastream, 
Datastream, Datastream)

Best,
Aven

发件人: JingsongLee
发送时间: 2019年12月31日 17:03
收件人: user-zh
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

Hi aven,

这是个合理的需求。
现在的问题是:
- Flink table只支持Row, Pojo, Tuple, CaseClass作为结构化的数据类型。
- 
而你的类型是JSONObject,它其实也是一个结构化的数据类型,但是目前Flink不支持它,所以可以考虑有这样的DeserializationSchema机制来支持它。

但是我理解其实没有差别多少,比如你提供RowDeserializationSchema,其实就是JSONObject到Row的转换,那你完全可以把这个套在DataStream.map中,把它转换成Flink
 table支持的结构化类型。

Best,
Jingsong Lee


--
From:aven.wu 
Send Time:2019年12月31日(星期二) 14:09
To:user-zh@flink.apache.org 
Subject:回复: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你好!
“把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。
如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。

best wish
发送自 Windows 10 版邮件应用

发件人: Terry Wang
发送时间: 2019年12月30日 12:37
收件人: user-zh@flink.apache.org
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 





Re:Re: Re: SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 文章 amenhub
hi BenChao,


正是您所说的计算列特性,期待1.10版本的正式发布!非常感谢!


祝好!







在 2020-01-04 15:35:24,"Benchao Li"  写道:
>hi 世民,
>
>这个错误的意思是你的stream table里面需要有一个处理时间字段,目前为止我大概了解的有这么几种方法可以产生:
>1. 如果是从DataStream注册为Table的话,可以用:
>https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
>2. 如果是用TableSource注册的Table的话,可以用:
>https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html#using-a-tablesource
>3. 即将发布的1.10里面支持计算列,可以通过DDL直接声明一个处理时间字段:
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html
>
>此外,维表join的场景对mysql没有要求要支持Temporal Table,这个是Flink
>SQL负责解析处理的,最终请求mysql的是一个普通的select语句。
>
>amenhub  于2020年1月4日周六 下午1:55写道:
>
>>
>>
>> hi Benchao,
>>
>>
>> 我明白你的意思,我认真在看官方文档学习flink相关知识,知道目前temporal table join只支持processing-time,
>> 但是当我使用给出的join sql例子写法时,报出这个异常,[  Column 'proctime' not found in table 'o'
>> ],这个问题是传统的通过别名 [ o ]去找 [ o ]表中的proctime列,但是显然表中是没有proctime列的,
>> 请问我该怎么解决这个问题呢?我应该去了解熟悉哪方面的知识?Mysql支持ANSI-2011标准的Temporal table语义吧,请赐教~
>>
>>
>> 祝好
>>
>>
>>
>>
>>
>>
>> 在 2020-01-04 12:10:34,"Benchao Li"  写道:
>> >hi 世民,
>> >
>> >邮件列表里不支持直接发送图片,你可以用一些图床工具来发送图片。
>>
>> >根据你的描述,我猜测你应该是join维表的语法写的不对,写成了普通的join的方式。这种情况下,会把jdbc的表解析成`JDBCInputFormat`,一次性读取全部数据。
>> >维表join的SQL写法如下所示:
>> >
>> >SELECT
>> >  o.amout, o.currency, r.rate, o.amount * r.rateFROM
>> >  Orders AS o*  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>> >*  ON r.currency = o.currency
>> >
>> >详细内容可以参考文档:
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins
>> >
>> >刘世民  于2020年1月4日周六 上午11:27写道:
>> >
>> >> hi~
>> >> 如图所示,在做kafka和jdbc
>> >> join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教
>> >>
>> >> 小白敬上~
>> >>
>> >>
>> >>
>> >>
>> >
>> >
>> >--
>> >
>> >Benchao Li
>> >School of Electronics Engineering and Computer Science, Peking University
>> >Tel:+86-15650713730
>> >Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re:Re: SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 文章 amenhub


hi Benchao,


我明白你的意思,我认真在看官方文档学习flink相关知识,知道目前temporal table join只支持processing-time,
但是当我使用给出的join sql例子写法时,报出这个异常,[  Column 'proctime' not found in table 'o' 
],这个问题是传统的通过别名 [ o ]去找 [ o ]表中的proctime列,但是显然表中是没有proctime列的,
请问我该怎么解决这个问题呢?我应该去了解熟悉哪方面的知识?Mysql支持ANSI-2011标准的Temporal table语义吧,请赐教~


祝好






在 2020-01-04 12:10:34,"Benchao Li"  写道:
>hi 世民,
>
>邮件列表里不支持直接发送图片,你可以用一些图床工具来发送图片。
>根据你的描述,我猜测你应该是join维表的语法写的不对,写成了普通的join的方式。这种情况下,会把jdbc的表解析成`JDBCInputFormat`,一次性读取全部数据。
>维表join的SQL写法如下所示:
>
>SELECT
>  o.amout, o.currency, r.rate, o.amount * r.rateFROM
>  Orders AS o*  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>*  ON r.currency = o.currency
>
>详细内容可以参考文档:
>https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins
>
>刘世民  于2020年1月4日周六 上午11:27写道:
>
>> hi~
>> 如图所示,在做kafka和jdbc
>> join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教
>>
>> 小白敬上~
>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 文章 Benchao Li
hi 世民,

邮件列表里不支持直接发送图片,你可以用一些图床工具来发送图片。
根据你的描述,我猜测你应该是join维表的语法写的不对,写成了普通的join的方式。这种情况下,会把jdbc的表解析成`JDBCInputFormat`,一次性读取全部数据。
维表join的SQL写法如下所示:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rateFROM
  Orders AS o*  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
*  ON r.currency = o.currency

详细内容可以参考文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins

刘世民  于2020年1月4日周六 上午11:27写道:

> hi~
> 如图所示,在做kafka和jdbc
> join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教
>
> 小白敬上~
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 文章 刘世民
hi~
如图所示,在做kafka和jdbc 
join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教


小白敬上~

Re: 使用influxdb作为flink metrics reporter

2020-01-03 文章 Yun Tang
Hi 张江


  *   Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention 
policy.
  *   kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 
[1],在Flink-1.9 版本下可以忽略这些异常。

[1] https://issues.apache.org/jira/browse/FLINK-12147

祝好
唐云

From: 张江 
Sent: Friday, January 3, 2020 21:22
To: user-zh@flink.apache.org 
Subject: 使用influxdb作为flink metrics reporter

大家好,


我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置:
metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics
metrics.reporter.influxdb.password:qwerty
metrics.reporter.influxdb.retentionPolicy:one_hour
但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错:
error  [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", 
"service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] 
"POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 
"-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165


我使用的是 flink 1.9.1,influxdb版本是1.79.


而且,当我不设置retentionPolicy时,还是会报错,提示:
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
 partial write: unable to parse 
"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id=
 
cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02
 value=? 157805124760500": invalid boolean


求问各位大佬,这些问题怎么解决?
谢谢


祝好,





单个算子处理一条数据的真实处理时间

2020-01-03 文章 张江
大家好,



之前问过类似的问题,可能表达不太清楚。向各位大佬再请教一下。

有人知道怎么获取以下的flink metrics么:




这个是flink forward asia 
2019会议上马庆祥老师讲的flink动态资源调整里的内容。但我自己在flink官网上没有看到metrics有这个指标信息。

谢谢。




祝好,







使用influxdb作为flink metrics reporter

2020-01-03 文章 张江
大家好,


我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置:
metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics
metrics.reporter.influxdb.password:qwerty 
metrics.reporter.influxdb.retentionPolicy:one_hour
但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错:
error  [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", 
"service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] 
"POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 
"-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165


我使用的是 flink 1.9.1,influxdb版本是1.79.


而且,当我不设置retentionPolicy时,还是会报错,提示:
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
 partial write: unable to parse 
"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id=
 
cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02
 value=? 157805124760500": invalid boolean


求问各位大佬,这些问题怎么解决?
谢谢


祝好,