应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
两个从 kafka 创建的表:
tableA: key valueA
tableB: key
Hi folks,
I have a question related configuration for new memory introduced in flink
1.10. Has anyone encountered similar problem?
I'm trying to make use of *taskmanager.memory.process.size* configuration
key in combination with mesos session cluster, but I get an error like this:
2020-03-11
两个从 kafka 创建的表:
tableA: key valueA
tableB: key valueB
用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?
比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
谢谢,
王磊
Yes,
You should take a look to [1]. Others config you need too.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html
Best,
Jingsong Lee
On Thu, Mar 12, 2020 at 12:26 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
> Hi Jingsong,
>
> So i can write
我试了下,是可以的。
Thanks
wangl...@geekplus.com.cn
Sender: Kurt Young
Send Time: 2020-03-11 19:59
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
那有可能是可以的,你可以试试看
Best,
Kurt
On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn
wrote:
Hi Jingsong,
So i can write the code as following?
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
应该只能改ContinuousFileMonitoringFunction源码 , 支持多path
王智 于2020年3月4日周三 下午6:34写道:
> 我的需求是2,现在我使用的是execEnv.createInput(inputFormat()),
>
> 我先去试试 env.addSource(new InputFormatSourceFunction(..)...)。
>
> 多谢~
>
>
>
>
>
>
>
>
> 原始邮件
>
>
> 发件人:"JingsongLee"< lzljs3620...@aliyun.com.INVALID ;
>
>
Hi wanglei,
If you are using Flink 1.10, you can set "state.backend=rocksdb" to
"TableConfig.getConfiguration".
And you can find related config options here[1].
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html
Jingsong Lee
On Thu, Mar 12, 2020 at 11:15 AM
Hi Aven,
静态字段的使用可能会很 tricky,因为只有同一个 task 的代码才运行在同一个 classloader 里。我见过想用静态字段做全局 Map
存储的,那个实际上只有并行度设置为 1 的时候语义才对。
你说启动的生命周期执行一些用户代码,那其实就是 RichFunction 的 open
方法,它就是设计来做这个的。具体可以看你的实际业务,未必要搞得这么奇怪(x
Best,
tison.
aven.wu 于2020年3月12日周四 上午10:54写道:
> Hello
>
>
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.sqlUpdate()...
Is there a way i can set stateBackEnd like normal streaming program as
folloing:StreamExecutionEnvironment
Hi Vitaliy,
You can specify a yarn queue by either setting the configuration option
'yarn.application.queue' [1], or using the command line option '-qu' (or
'--queue') [2].
Thank you~
Xintong Song
[1]
Hello
还有一个问题,除了在算子的Open方法中获取这个参数还有别的地方可以获取吗?或者在Gobgraph启动的生命周期里面有哪个阶段可以被调用提执行一些用户代码。
我的需求是需要通过命令行参数初始化一些静态类的属性,全局的静态类会被算子调用以执行某些通用的功能,如果我在open方法中去初始化的话是不太优雅,并且为了保证可用必须在每个算子的Open方法中都调用,对于一些非Rich的算子使用静态方法就会有问题。
Best
Aven
发件人: zhisheng
发送时间: 2020年3月11日 21:16
收件人: user-zh
主题: Re: 关于Flink 命令行参数广播的问题
Hi Benchao,
Great feedbacks!
1) 全量初始化能力:
第一版中社区有计划引入 flink sql 直连 mysql 获取 binlog 的方案,该方案可以获取全量+增量 binlog,且有一致性保证。
但是对接db 全量+ mq binlog,将会是未来的一个工作,主要的难点在于全量如何平滑切换到 增量的 mq offset 上。
2) 自动生成watermark:
这也是 roadmap 上的一个工作。
3) binlog以state的形式存储,只需全量加载,后续只接受增量:
我理解这个和 (1) 是类似的需求。Flink SQL 对接之后
Benchao is right.
嵌套字段是无法直接访问的,需要逐级引用到。
On Thu, 12 Mar 2020 at 00:45, Benchao Li wrote:
> Hi 周炎,
>
> 你的`date` 和 `time`都是在嵌套结构内部的字段,需要用*request.`value`.`date`*和*
> request.`value`.`time`*来使用它们。
>
> 周炎 于2020年3月11日周三 下午5:42写道:
>
> > DDL语句如下:
> > CREATE TABLE ods_usage_naga_dsp_filter (
> >
Hi Fabian,
We ran into the same issue. We modified the reporter to emit the metrics in
chunks and it worked fine after. Would be interested in seeing a ticket on
this as well.
- Steve
On Wed, Mar 11, 2020 at 5:13 AM Chesnay Schepler wrote:
> Please open a JIRA; we may have to split the
Hi.
Did someone encountered problem with sending metrics with datadog http
reporter?
My setup is flink version 1.8.2 deployed on k8s with 1 job manager and 10
task managers.
Every version deploy I see metrics on my dashboard but after a few minutes
its stopped being sent from all task managers
Hi,
How can I specify a yarn queue when I start a new job programmatically?
Regards,
Vitaliy
Hi Morgan,
> I am interested in knowing more about the failure detection mechanism
used by Flink, unfortunately information is a little thin on the ground and
I was hoping someone could shed a little light on the topic.
It is probably best to look into the implementation (see my answers below).
Hi 周炎,
你的`date` 和 `time`都是在嵌套结构内部的字段,需要用*request.`value`.`date`*和*
request.`value`.`time`*来使用它们。
周炎 于2020年3月11日周三 下午5:42写道:
> DDL语句如下:
> CREATE TABLE ods_usage_naga_dsp_filter (
> request row<`value` row varchar,reqid varchar,source varchar,`time` varchar,`filter`
> array> >>>,
>
Hi,
感谢Jark发起这个话题的讨论,这个功能对于Flink SQL来讲是一个非常重要的扩展。
问卷已填,再此再提几个小想法:
1. 希望对接binlog时可以有全量初始化的能力,这样在Flink中我们就有了一个全表的实时状态,方便其他表与之进行join。
2. 希望能够自动生成watermark,这样子可以尽可能的减少接入成本。因为有些场景是其他的append
log数据可以跟实时维护的表进行join;也有些场景是两个binlog形成的动态表互相join。
3.
Hi Flavio,
We have implemented our own flink operator, the operator will start a flink
job cluster (the application jar is already packaged together with flink in
the docker image). I believe Google's flink operator will start a session
cluster, and user can submit the flink job via REST. Not
Hi Flavio,
We have implemented our own flink operator, the operator will start a flink
job cluster (the application jar is already packaged together with flink in
the docker image). I believe Google's flink operator will start a session
cluster, and user can submit the flink job via REST. Not
http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-11-133919.png
我看现在还不支持 per job 模式,哎
zhisheng 于2020年3月11日周三 下午9:31写道:
> 好的,我先去 look look,感谢
>
> Kurt Young 于2020年3月11日周三 下午9:30写道:
>
>> https://github.com/ververica/flink-sql-gateway 了解一下
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11,
好的,我先去 look look,感谢
Kurt Young 于2020年3月11日周三 下午9:30写道:
> https://github.com/ververica/flink-sql-gateway 了解一下
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 9:26 PM zhisheng wrote:
>
> > hi, Kurt Young
> >
> > 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> >
https://github.com/ververica/flink-sql-gateway 了解一下
Best,
Kurt
On Wed, Mar 11, 2020 at 9:26 PM zhisheng wrote:
> hi, Kurt Young
>
> 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> sql-client
>
> Kurt Young 于2020年3月11日周三 下午7:59写道:
>
> > 那有可能是可以的,你可以试试看
> >
> >
hi, Kurt Young
除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
sql-client
Kurt Young 于2020年3月11日周三 下午7:59写道:
> 那有可能是可以的,你可以试试看
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> > Hi Kurt,
> >
> >
Thank you all.
I will change the configuration file to and deploy next week.
Best regards
Eyal Peer
From: Chesnay Schepler
Sent: Wednesday, March 11, 2020 11:10 AM
To: Yang Wang ; miki haiat
Cc: Eyal Pe'er ; Rafi Aroch ;
user ; StartApp R Data Platform
Subject: Re: Setting app Flink logger
hi,aven.wu
可以使用 ParameterTool
获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool);
在算子中可以在 open 方法里面通过
getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置
aven.wu 于2020年3月11日周三 下午3:42写道:
> Hi,大家好!
> 遇到一个问题,在使用flink run
>
那有可能是可以的,你可以试试看
Best,
Kurt
On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
> Hi Kurt,
>
> 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> 中恢复的功能吗?
> 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
>
Thanks Jark,
No word to express my '囧'.
wangl...@geekplus.com.cn
Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei,
The "2020-03-11T13:00:00.123Z" format is correct, but you
Thanks Jark,
No word to express my '囧'.
wangl...@geekplus.com.cn
Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei,
The "2020-03-11T13:00:00.123Z" format is correct, but you
Hi Kurt,
如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗?
代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
存储并且再次提交任务可以被访问到直接用吗?
谢谢,
王磊
wangl...@geekplus.com.cn
Sender: Kurt Young
Send Time: 2020-03-11 12:54
Receiver: wangl...@geekplus.com.cn
cc:
Hi Lei,
The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong
field name in the DDL.
It should be "input_date", not "intput_date".
Best,
Jark
On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
>
> Sorry i sent the Chinese written
Hi Lei,
The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong
field name in the DDL.
It should be "input_date", not "intput_date".
Best,
Jark
On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
>
> Sorry i sent the Chinese written
Sorry i sent the Chinese written email to user@
Let me translate it to English.
I create a table using sql-client from kafka topic:
CREATE TABLE order_status (
out_order_code VARCHAR,
intput_date TIMESTAMP(3),
owner_code VARCHAR,
status INT
) WITH (
Sorry i sent the Chinese written email to user@
Let me translate it to English.
I create a table using sql-client from kafka topic:
CREATE TABLE order_status (
out_order_code VARCHAR,
intput_date TIMESTAMP(3),
owner_code VARCHAR,
status INT
) WITH (
DDL语句如下:
CREATE TABLE ods_usage_naga_dsp_filter (
request row<`value` row> >>>,
event_ts as to_timestamp(concat(`date`,`time`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS event_ts - interval '60' second
)WITH (
'connector.type' = 'kafka',
'format.fail-on-missing-field'='false',
用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
out_order_code VARCHAR,
intput_date TIMESTAMP(3),
owner_code VARCHAR,
status INT
) WITH (
'connector.type' = 'kafka',.
'format.type' = 'json', 'format.derive-schema' = 'true' )
> The second reason is this query need to scan the whole table. I think we
can do better :-)
Not necessarily, you said all the changes will trigger a DDB stream, you
can use Flink to consume such
stream incrementally.
For the 1st problem, I think you can use DataStream API and register a
timer
Sorry I wanted to mention https://github.com/lyft/flinkk8soperator (I don't
know which one of the 2 is better)
On Wed, Mar 11, 2020 at 10:19 AM Flavio Pompermaier
wrote:
> Have you tried to use existing operators such as
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or
>
Have you tried to use existing operators such as
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator?
On Wed, Mar 11, 2020 at 4:46 AM Xintong Song wrote:
> Hi Eleanore,
>
> That does't sound like a scaling issue. It's
Please open a JIRA; we may have to split the datatog report into several
chunks.
On 09/03/2020 07:47, Fanbin Bu wrote:
quote from the following link:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Query-named-operator-exceeds-80-characters-td24807.html#a24818
"This is
@Eyal:
The image you are using is for 1.9.2, but the logging configuration you
fetched was from master.
In 1.9.2 we use Log4j1, but on master we switched to Log4j2 instead,
which uses a different configuration syntax. Log4j1 pretty much ignores
the entire file, causing the error.
Please
>
> About the problem, we have 2 choices. The first one is using Flink as
> described in this email thread. The second one is using AWS Lambda
> triggered by CDC stream and compute the latest 15 days record, which is a
> walk-around solution and looks not as elegant as Flink to me.
>
>
Currently
Hi Kurt,
What you said is the 1st reason.
The second reason is this query need to scan the whole table. I think we
can do better :-)
Best,
Jiawei
On Wed, Mar 11, 2020 at 10:52 AM Kurt Young wrote:
> Hi Jiawai,
>
> Sorry I still didn't fully get your question. What's wrong with your
> proposed
I am using flink-1.10. But I add flink-json-1.9.1.jar and
flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory.
After change to flink-json-1.10.0.jar,
flink-sql-connector-kafka_2.12-1.10.0.jar, it works.
But I have no idea why the yaml way works when i use flink-json-1.9.1.jar and
Hi Lei,
Yes. If you are creating a Kafka table, then the kafka connector jar and
some format jars are required.
That's weird. If DDL is failed, the yaml way should fail in the same
exception, unless some connector properties value is not the same.
Could you share the detailed exception stack?
Hi,大家好!
遇到一个问题,在使用flink run
提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。
Best
Aven
Hi,
Flink有类似storm主动fail的机制吗?
没有的话,有什么好的实现方案吗?比如用状态存储失败的记录?
感谢您的回复
| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|
Signature is customized by Netease Mail Master
Hi all,
The Flink community is going to integrate some popular Change Data Capture
(CDC) tools. We would like to support reading and processing some common
binlog/changelog data in Flink SQL in the next release. We hope this survey
can help identify the most common cases and prioritize our
Hi, 大家好,
Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog
数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。
欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢!
http://apacheflink.mikecrm.com/wDivVQ1
也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。
Best,
Jark
Hi Jark,
I have tried to use CREATE table DDL
First ./bin/sql-client.sh embedded. Then create a table from kafka topic and
it tell me table has been created.
But when I query with select * from tableName. There's error:
[ERROR] Could not execute SQL statement. Reason:
52 matches
Mail list logo