ML/DL via Flink

2020-04-27 Thread m@xi
Hello Flinkers,

I am building a *streaming* prototype system on top of Flink and I want
ideally to enable ML training (if possible DL) in Flink. It would be nice to
lay down all the existing libraries that provide primitives that enable the
training of ML models. 

I assume it is more efficient to do all the training in Flink (somehow)
rather than (re)training a model in Tensorflow (or Pytorch) and porting it
to a flink Job. For instance,
https://stackoverflow.com/questions/59563265/embedd-existing-ml-model-in-apache-flink
Especially, in streaming ML systems the training and the serving should both
happen in an online fashion.

To initialize the pool, I have found the following options that run on top
of Flink i.e., leveraging the engine for distributed and scalable ML
training.

1) *FlinkML(DataSet API)*
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/index.html
This is not for streaming ML as it shits on top of DataSet API. In addition,
recently the library is dropped
https://stackoverflow.com/questions/58752787/what-is-the-status-of-flinkml
but there is ongoing development (??) of a new library on top of TableAPI.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
https://issues.apache.org/jira/browse/FLINK-12470
which is not in the 1.10 distribution.

2) *Apache Mahout* https://mahout.apache.org/
I thought it was long dead, but recently they started developing it again. 

3) *Apache SAMOA* https://samoa.incubator.apache.org/
They are developing it, but slowly. It is an incubator project since 2013.

4) *FlinkML Organization* https://github.com/FlinkML
This one has repos that are interesting e.g. the flink-jpmml
https://github.com/FlinkML/flink-jpmml 
and an implementation of a parameter server
https://github.com/FlinkML/flink-parameter-server
, which is really usefull when for enabling distributed training in a sense
that the model is also distributed during training.
Though, the repo(s) are not really active.

5) *DeepLearning4j *https://deeplearning4j.org/
This is a distributed, deep learning library that it was said to work also
on top of Flink (here
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-support-for-DeepLearning4j-or-other-deep-learning-library-td12157.html)
I am not interested at all in GPU support but I am wondering is anyone had
succesfully used this one on top of Flink.

6) *Proteus - SOLMA* https://github.com/proteus-h2020/proteus-solma
It is a scalable online learning library on top of Flink, and is the output
of a H2020 research project called PROTEUS. 
http://www.bdva.eu/sites/default/files/hbouchachia_sacbd-ecsa18.pdf

7) *Alibaba - ALink*
https://github.com/alibaba/Alink/blob/master/README.en-US.md
A machine learning algorithm platform from Alibaba which is actively
maintained.

These are all the available systems that I have found ML using Flink's
engine. 

*Questions*
(i)  Has anyone used them?
(ii) More specifically, has someone implemented *Stochastic Gradient
Descent, Skip-gram models, Autoencoders* with any of the above tools (or
other)?

*Remarks*
If you have any experiences/comments/additions to share please do it!  Gotta
Catch 'Em All!   

Best,
Max




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: Flink Buildin UDF 性能较慢

2020-04-27 Thread forideal



Hi Jark:


Thanks for your replay!


1. 是基于哪个版本,哪个 planner 进行的测试?
Flink 1.9.0 Blink Planner
2. 流计算模式还是批计算模式?
流计算模式
3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
  注册的名字为 red_sum


Best forideal
在 2020-04-28 11:13:50,"Jark Wu"  写道:
>Hi,
>
>看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。
>我想先问几个问题:
>1. 是基于哪个版本,哪个 planner 进行的测试?
>2. 流计算模式还是批计算模式?
>3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
>
>Best,
>Jark
>
>On Tue, 28 Apr 2020 at 10:46, forideal  wrote:
>
>> 大家好:
>>
>>
>>   我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG
>> 等等。
>> 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
>>
>>
>> SQL:
>>
>>
>> select
>>   query_nor,
>>   sum(cast (1asbigint))as query_nor_counter
>> from ods_search_track
>> groupby
>>   query_nor,
>>   HOP(
>> event_time,interval'30'SECOND,interval'30'MINUTE)
>> sum:
>> public class Sum extends AggregateFunction {
>>
>> @Override
>> public boolean isDeterministic() {
>> return false;
>> }
>>
>> @Override
>> public AtomicLong createAccumulator() {
>> return new AtomicLong();
>> }
>>
>> @Override
>> public void open(FunctionContext context) throws Exception {
>>
>> }
>>
>> @Override
>> public Long getValue(AtomicLong acc) {
>> return acc.get();
>> }
>>
>> @Override
>> public TypeInformation getResultType() {
>> return Types.LONG;
>> }
>>
>> public void merge(AtomicLong acc, Iterable it) {
>> Iterator iter = it.iterator();
>> while (iter.hasNext()) {
>> AtomicLong a = iter.next();
>> acc.addAndGet(a.get());
>> }
>> }
>>
>> public void accumulate(AtomicLong datas, Long data) {
>> datas.addAndGet(data);
>> }
>> }
>>
>>
>> 使用 Flink buildin COUNT
>>
>>
>> select
>>   query_nor,
>>   count(1) as query_nor_counter
>> from ods_search_track
>> groupby
>>   query_nor,
>>   HOP(
>> event_time,interval'30'SECOND,interval'30'MINUTE)


Re: 关于撤回流的Watermark问题

2020-04-27 Thread Benchao Li
Hi lec,

不好意思,这个行为是之前的行为。这个最近已经在FLINK-16887中顺手把这个行为修改了。

我之前也建了一个issue:https://issues.apache.org/jira/browse/FLINK-16844 想修改这个行为的,
不过还没有得到committer的认同。这个issue那我现在可以关掉了。

那么我们按照当前最新的代码来看的话,你可以认为Window Operator是支持retract消息的。
那么对于retract消息的处理,其实跟普通的append消息是相同的,过期的retract消息同样会被直接丢弃。



lec ssmi  于2020年4月28日周二 上午11:28写道:

> 但是我在DataStreamGroupWindowAggregateBase这个类里面,发现以下两个方法都是true啊。
> override def needsUpdatesAsRetraction = true
> override def consumesRetractions = true
>
> Benchao Li  于2020年4月28日周二 上午10:19写道:
>
> > Hi lec,
> >
> > Window Operator目前是不支持retract的输入的。
> >
> > lec ssmi  于2020年4月28日周二 上午9:45写道:
> >
> > > Hi:
> > >在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢?
> > >举个例子,
> > >假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行
> > time
> > >
> > >
> >
> window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time
> > > window的时候,因为超过了最大延迟,DELELE记录和INSERT记录都不会被处理吗?
> > >  谢谢。
> > >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-27 Thread wangl...@geekplus.com.cn

Thanks Leonard, 

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON DUPLICATE KEY 
吗? 
这个在源代码哪个地方呢?

谢谢,
王磊



wangl...@geekplus.com.cn 

 
发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei
 
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, 
RetractStreamSink)
 
> 我看 
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
>  没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。
 
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
 
不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可 
 
Best,
 
Leonard Xu


Re: 关于撤回流的Watermark问题

2020-04-27 Thread lec ssmi
但是我在DataStreamGroupWindowAggregateBase这个类里面,发现以下两个方法都是true啊。
override def needsUpdatesAsRetraction = true
override def consumesRetractions = true

Benchao Li  于2020年4月28日周二 上午10:19写道:

> Hi lec,
>
> Window Operator目前是不支持retract的输入的。
>
> lec ssmi  于2020年4月28日周二 上午9:45写道:
>
> > Hi:
> >在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢?
> >举个例子,
> >假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行
> time
> >
> >
> window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time
> > window的时候,因为超过了最大延迟,DELELE记录和INSERT记录都不会被处理吗?
> >  谢谢。
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: Support LRU cache in JDBCLookupFunction

2020-04-27 Thread tao siyuan
谢谢。

Jark Wu  于2020年4月28日周二 上午11:00写道:

> FYI
>
> 1. 增加统计缓存命中情况: https://issues.apache.org/jira/browse/FLINK-16038
> 2. 增加异步交互模式: https://issues.apache.org/jira/browse/FLINK-14902
>
>
> On Mon, 27 Apr 2020 at 17:01, tao siyuan  wrote:
>
> > 好的,谢谢
> >
> > Benchao Li  于2020年4月27日周一 下午5:00写道:
> >
> > > 我觉得是可以的。
> > >
> > > tao siyuan  于2020年4月27日周一 下午4:24写道:
> > >
> > > > 谢谢,
> > > >
> > > > 我能否为第二个意见提交一个issue,为connector增加一个JDBCLookupFunction的异步接口
> > > >
> > > > Benchao Li  于2020年4月27日周一 下午4:11写道:
> > > >
> > > > > Hi,
> > > > >
> > > > > 第一个意见现在已经有了一个issue[1]和pr,可以参考一下。
> > > > > 第二个意见据我所知是有异步维表的接口和实现,但是connector还没有实现。
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-16038
> > > > >
> > > > > tao siyuan  于2020年4月27日周一 下午4:00写道:
> > > > >
> > > > > > hi,
> > > > > >
> > > > > > 不好意思,我忽略了内部使用的Guava cache,
> > > > > >
> > > > > > 我这里可以提2个意见吗:
> > > > > > 1,增加统计缓存命中情况
> > > > > > 2,增加异步交互模式
> > > > > >
> > > > > > Jark Wu  于2020年4月27日周一 下午3:31写道:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > 目前 jdbc lookup 就是用的 LRU cache。 你是希望 cache 大小能动态调整?
> > > > > > >
> > > > > > > > 2020年4月27日 15:24,tao siyuan  写道:
> > > > > > > >
> > > > > > > > HI all:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 目前,一些情况下会遇到到流及外部维表Join。而使用JDBCLookupFunction只支持cache固定大小和条数,但是通常,我们可以使用cache
> > > > > > > > LRU 策略 提高cache使用率以及reduce数据库的交互次数。
> > > > > > > >
> > > > > > > > 请问这是一个值得提交的issue吗?
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Benchao Li
> > > > > School of Electronics Engineering and Computer Science, Peking
> > > University
> > > > > Tel:+86-15650713730
> > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >
> >
>


Re: Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread Jark Wu
Yes.
This is an optimization if the previous result is the same to the new
result, then operator will not generate records for the new result.

Best,
Jark

On Tue, 28 Apr 2020 at 11:05, izual  wrote:

> Thank you, Jark.
>
> I also have tried COUNT DISTINCT ^_^, the only problem is that if the
> `tblEvent` generates two simple id, such as:
> t1: {"id": 1}
> t2: {"id": 1}
>
> But the sql will only output one result only on t1 record received.
> I think maybe some optimizer worker background when the result does not
> change?
>
>
> At 2020-04-28 10:53:34, "Jark Wu"  wrote:
>
> Hi izual,
>
> In such case, I think you should try COUNT DISTINCT instead of COUNT.
> DISTINCT will help to deduplicate, so no matter how many times you
> received id=1, the region count should always 3.
>
> SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN
> tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP
> BY tblEvent.id
>
> Best,
> Jark
>
>
> On Mon, 27 Apr 2020 at 23:41, Benchao Li  wrote:
>
>> Hi izual,
>>
>> IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
>> The state is not managed in UDAF, it's managed by aggregation operator,
>> and
>> your UDAF's aggregator will be handled by operator using state.
>>
>> izual  于2020年4月27日周一 下午11:21写道:
>>
>>> Thanks, Benchao.
>>>
>>> Maybe change the dimension table will work, but this changes a lot,
>>> include `size/count` is not the column of one dim table.
>>> I notice that user can define Aggregate Functions[1],  but this page
>>> also said:
>>> > Accumulators are automatically backup-ed by Flink’s checkpointing
>>> mechanism and restored
>>> So is it right to implement my own COUNT/SUM UDF?
>>>
>>> [1].
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>>>
>>>
>>>
>>>
>>>
>>>
>>> At 2020-04-27 17:32:14, "Benchao Li"  wrote:
>>>
>>> Hi,
>>>
>>> There is indeed a state for the aggregation result, however we cannot
>>> disable it, it's by design.
>>> StreamQueryConfig.maxIdleStateRetentionTime can control how long the
>>> state will be kept.
>>> If you can ensure the time gap between two records of the same id larger
>>> than, for example
>>> 1 min, then setting retention time to 1min can resolve your issue.
>>> If not, maybe you need to change your dimension table, making it return
>>> the count directly instead
>>> of return the details.
>>>
>>> izual  于2020年4月27日周一 下午5:06写道:
>>>
 I implements my DimTable by extends `LookupTableSource`[1], which
 stores data like:

 id=1 -> (SH, BJ, SD)

 id=2 -> (...)

 and then extends `TableFunction` to return the value corresponding to
 the lookup keys,and maybe return multi rows, for example, when lookupkeys
 is id=1, then in the `TableFunction.eval`

 ```

 collect('SH')

 collect('BJ')

 collect('SD')

 ```


 Now I want to get the region'count by id, which is from the
 tblEvent.id, sql is :


 SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
 SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
 tblEvent.id


 I expect the result of COUNT is always 3 for id = 1, no matter the id=1
 appears how many times.

 but the actual result is : 3, 6, 9, ...


 I think this is bcz the state mechanism behind COUNT, how to turn this
 off?

 Or what's the correct use for this?
 StreamQueryConfig.maxIdleStateRetentionTime or something?


 The reason not using state in flink:
 http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups




>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>
>
>
>


Re: RocksDB default logging configuration

2020-04-27 Thread Yun Tang
Hi Bajaj

Current "state.checkpoints.dir" defines cluster-wide location for cluster and 
each job would create the specific checkpoint location under it with job-id 
sub-directory. It is the same for the checkpoint URL in RocksDB.

And the configuration option "state.backend.rocksdb.localdir" [1] should work 
for RocksDB in Flink-1.7.1.

[1] 
https://github.com/apache/flink/blob/808cc1a23abb25bd03d24d75537a1e7c6987eef7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L285-L301

Best
Yun Tang

From: Bajaj, Abhinav 
Sent: Tuesday, April 28, 2020 8:03
To: user@flink.apache.org 
Cc: Chesnay Schepler 
Subject: Re: RocksDB default logging configuration


It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);



Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.

So I had to set from the job as above.



I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.

I am using Flink 1.7.1.



Thanks Chesnay for your response below.



~ Abhinav Bajaj



From: Chesnay Schepler 
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Subject: Re: RocksDB default logging configuration



CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.



AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.



FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.



The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.





On 23/04/2020 03:24, Bajaj, Abhinav wrote:

Bumping this one again to catch some attention.



From: "Bajaj, Abhinav" 
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org" 

Subject: RocksDB default logging configuration



Hi,



Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068.

It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.



Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.

And it seems there isn’t any related RocksDB 
configuration
 to set in flink-conf.yaml.



Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?



Appreciate the help!



~ Abhinav Bajaj



PS:  Sharing below snippet as desired option if possible -



StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();

stateBackend.setOptions(new OptionsFactory() {

@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}

});








Re: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-27 Thread Jark Wu
https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java#L261

Best,
Jark

On Tue, 28 Apr 2020 at 10:24, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Thanks Leonard,
>
> JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
> DUPLICATE KEY 吗?
> 这个在源代码哪个地方呢?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-04-27 12:58
> 收件人: user-zh
> 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
> Hi,wanglei
>
> > INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> > 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。
> 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
> 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
> RetractStreamSink)
>
> > 我看
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 没有 Retract 方式
> > 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
> 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
> 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
> 也不支持retract。
>
> > 如若不带 group by 直接:
> > INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> > 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
>
> 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
> 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
>
> Best,
>
> Leonard Xu
>


Re: flink 1.10内存设置

2020-04-27 Thread Xintong Song
Managed 用的是 native memory,是不受 JVM 管理的,因此不会体现在 JVM
的启动参数上。可以参考一下邮件列表里之前的讨论内容[1]。

Thank you~

Xintong Song


[1] http://apache-flink.147419.n8.nabble.com/Flink-tt1869.html


On Tue, Apr 28, 2020 at 9:32 AM 蒋佳成(Jiacheng Jiang) <920334...@qq.com>
wrote:

> hiXintong
>  
> 我还有一个问题,flink1.10在启动的时候会设置-Xmx,-Xms,-XX:MaxDirectMomerySize,-XX:MaxMetaspaceSize,官网也给出了这些设置的计算公式,但是并没有看见managed
> memory在哪里,managed memory似乎不属于jvm的堆,堆外和metaspace。那么managed
> memory属于哪块内存呢?为什么没有设置相应的jvm参数呢?
>
>
>
>
> --原始邮件--
> 发件人: "Xintong Song" 发送时间: 2020年4月27日(星期一) 晚上6:56
> 收件人: "user-zh" 主题: Re: flink 1.10内存设置
>
>
>
>
> FrameworkOff-heap和TaskOff-Heap之间是没有隔离的。Network
> 可以认为和前面两者是有隔离的,会在初始化的时候申请一个固定大小的bufferpool,整个运行过程中的内存占用是一个常量。
>
> Thankyou~
>
> XintongSong
>
>
>
>
> OnMon,Apr27,2020at6:14PM蒋佳成(JiachengJiang)<
> 920334...@qq.com
> wrote:
>
> Thanknbsp;younbsp;Xintong.我还有一个问题官网上:
>
> -XX:MaxDirectMemorySizeFramework+TaskOff-Heap+NetworkMemory
>
> MaxDirectMemorySize=Frameworknbsp;Off-Heapnbsp;+TaskOff-Heap+Network
>
> Memory。假如MaxDirectMemorySize=10,那么10=1+1+8和10=1+8+1,这2种情况有什么不一样吗?我的意思是这3个堆外内存有隔离吗?我之前没有设置task
> off-heap,但是还能启一个job,感觉没有隔离。
> 
> 
> 
> --nbsp;原始邮件nbsp;--
> 发件人:XintongSong gt;;
> 发送时间:2020年4月27日(星期一)中午12:06
> 收件人:user-zh 主题:Re:flink1.10内存设置
> 
> 
> 
> 你好,
> 
> 
>
> Flinknbsp;1.10nbsp;对不同的内存类型进行了更精细的控制,明确了总内存中每种类型、用途的内存的大小。举个例子,如果简单考虑nbsp;TMnbsp;中有nbsp;heap,
> 
>
> direct,nbsp;nativenbsp;三种内存类型,总内存大小是nbsp;300mb。在之前的版本中,可能是nbsp;heapnbsp;固定占用100mb,剩下nbsp;directnbsp;和
> 
>
> nativenbsp;共用nbsp;200mb。理论上,direct/nativenbsp;可以分别占用nbsp;200/0,nbsp;100/100,nbsp;0/200,只要总大小不超过nbsp;200
> 
>
> 即可。但是一旦出现超用,我们很难判断是nbsp;directnbsp;还是nbsp;nativenbsp;内存造成的。在nbsp;Flinknbsp;1.10nbsp;里,对nbsp;direct/native
>
> 分别用多少也做了明确的划分,这样一旦出现内存不足,就能够知道是哪部分造成的。上述描述是对内存模型做了简化,实际nbsp;Flink
> 考虑的内存类型不只三种,具体可以参考相关文档[1]。
> 
> 针对你的问题,是nbsp;directnbsp;内存不足造成的,在不改变总内存大小的情况下,可以调大
> ‘taskmanager.memory.task.off-heap.size’nbsp;,详见[2]。
> 
> 
>
> 关于隔离,Flinknbsp;中一个nbsp;TMnbsp;上的多个nbsp;slotnbsp;是跑在同一个进程中的,由于nbsp;JVMnbsp;自身的特点,进程内不同线程的内存是没有隔离的。Flink
> 
>
> 仅对托管内存(managednbsp;memory)[3]nbsp;进行了隔离,这部分内存是由nbsp;Flinknbsp;负责管理申请、分配、释放的,不依赖于nbsp;JVM
> 
>
> 的垃圾回收机制。托管内存目前仅用于nbsp;RocksDBStateBackendnbsp;和部分nbsp;Batchnbsp;算子,与你现在遇到的nbsp;directnbsp;oomnbsp;的问题无关。
> 
> Thanknbsp;you~
> 
> Xintongnbsp;Song
> 
> 
> [1]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_detail.html
> [2]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_trouble.html#outofmemoryerror-direct-buffer-memory
> [3]
> 
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_setup.html#%E6%89%98%E7%AE%A1%E5%86%85%E5%AD%98
> 
> 
> ;
> 
>
> Onnbsp;Mon,nbsp;Aprnbsp;27,nbsp;2020nbsp;atnbsp;10:31nbsp;AMnbsp;蒋佳成(Jiachengnbsp;Jiang)nbsp;<
> 920334...@qq.comgt;
> wrote:
> 
> gt;nbsp;hi
> gt;
> gt;
>
> gt;nbsp;amp;nbsp;nbsp;amp;nbsp;我有个测试的单机的standalone
> 
>
> gt;nbsp;flink,平时跑几个测试job,tm内存设置了8g。最近升级了flink1.10,只设置了taskmanager.memory.flink.size为10g。但是启了一个数据量最大的job后,就无法再启动第二个job了,报内存不够:java.lang.OutOfMemoryError:
>
> gt;nbsp;Directnbsp;buffernbsp;memory。这是1个job就基本把所有的Direct
> 
>
> gt;nbsp;Memory占完了吗?如果是这样的话,flink1.10还适合跑standalonenbsp;cluster吗?slot没有隔离这个内存吗?
> 
>
> gt;nbsp;在官网讲解slot的地方有这么一句话:Anbsp;TaskManagernbsp;withnbsp;threenbsp;slots,nbsp;fornbsp;example,nbsp;will
> 
>
> gt;nbsp;dedicatenbsp;1/3nbsp;ofnbsp;itsnbsp;managednbsp;memorynbsp;tonbsp;eachnbsp;slot.amp;nbsp;这里的itsnbsp;managed
>
> gt;nbsp;memory指的是taskmanager.memory.managed.size产生指定的内存吗?
> gt;
> gt;
> gt;nbsp;bestamp;nbsp;
> gt;nbsp;Jungle


Re: Flink Buildin UDF 性能较慢

2020-04-27 Thread Jark Wu
Hi,

看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。
我想先问几个问题:
1. 是基于哪个版本,哪个 planner 进行的测试?
2. 流计算模式还是批计算模式?
3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?

Best,
Jark

On Tue, 28 Apr 2020 at 10:46, forideal  wrote:

> 大家好:
>
>
>   我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG
> 等等。
> 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
>
>
> SQL:
>
>
> select
>   query_nor,
>   sum(cast (1asbigint))as query_nor_counter
> from ods_search_track
> groupby
>   query_nor,
>   HOP(
> event_time,interval'30'SECOND,interval'30'MINUTE)
> sum:
> public class Sum extends AggregateFunction {
>
> @Override
> public boolean isDeterministic() {
> return false;
> }
>
> @Override
> public AtomicLong createAccumulator() {
> return new AtomicLong();
> }
>
> @Override
> public void open(FunctionContext context) throws Exception {
>
> }
>
> @Override
> public Long getValue(AtomicLong acc) {
> return acc.get();
> }
>
> @Override
> public TypeInformation getResultType() {
> return Types.LONG;
> }
>
> public void merge(AtomicLong acc, Iterable it) {
> Iterator iter = it.iterator();
> while (iter.hasNext()) {
> AtomicLong a = iter.next();
> acc.addAndGet(a.get());
> }
> }
>
> public void accumulate(AtomicLong datas, Long data) {
> datas.addAndGet(data);
> }
> }
>
>
> 使用 Flink buildin COUNT
>
>
> select
>   query_nor,
>   count(1) as query_nor_counter
> from ods_search_track
> groupby
>   query_nor,
>   HOP(
> event_time,interval'30'SECOND,interval'30'MINUTE)


Re:Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
Thank you, Jark.


I also have tried COUNT DISTINCT ^_^, the only problem is that if the 
`tblEvent` generates two simple id, such as:
t1: {"id": 1}
t2: {"id": 1}


But the sql will only output one result only on t1 record received.
I think maybe some optimizer worker background when the result does not change?




At 2020-04-28 10:53:34, "Jark Wu"  wrote:

Hi izual,


In such case, I think you should try COUNT DISTINCT instead of COUNT. 
DISTINCT will help to deduplicate, so no matter how many times you received 
id=1, the region count should always 3. 


SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim FOR 
SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


Best,
Jark




On Mon, 27 Apr 2020 at 23:41, Benchao Li  wrote:

Hi izual,


IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and 
your UDAF's aggregator will be handled by operator using state.


izual  于2020年4月27日周一 下午11:21写道:

Thanks, Benchao.


Maybe change the dimension table will work, but this changes a lot, include 
`size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism 
> and restored
So is it right to implement my own COUNT/SUM UDF?


[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions


















At 2020-04-27 17:32:14, "Benchao Li"  wrote:

Hi,


There is indeed a state for the aggregation result, however we cannot disable 
it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will 
be kept.
If you can ensure the time gap between two records of the same id larger than, 
for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the 
count directly instead 
of return the details.


izual  于2020年4月27日周一 下午5:06写道:


I implements my DimTable by extends `LookupTableSource`[1], which stores data 
like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the 
lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, 
then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```




Now I want to get the region'count by id, which is from the tblEvent.id, sql is 
:




SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM 
AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id




I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears 
how many times.

but the actual result is : 3, 6, 9, ...




I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime 
or something?




The reason not using state in flink: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups





 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn




 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread Jark Wu
Hi izual,

In such case, I think you should try COUNT DISTINCT instead of COUNT.
DISTINCT will help to deduplicate, so no matter how many times you received
id=1, the region count should always 3.

SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim
FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
tblEvent.id

Best,
Jark


On Mon, 27 Apr 2020 at 23:41, Benchao Li  wrote:

> Hi izual,
>
> IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
> The state is not managed in UDAF, it's managed by aggregation operator,
> and
> your UDAF's aggregator will be handled by operator using state.
>
> izual  于2020年4月27日周一 下午11:21写道:
>
>> Thanks, Benchao.
>>
>> Maybe change the dimension table will work, but this changes a lot,
>> include `size/count` is not the column of one dim table.
>> I notice that user can define Aggregate Functions[1],  but this page also
>> said:
>> > Accumulators are automatically backup-ed by Flink’s checkpointing
>> mechanism and restored
>> So is it right to implement my own COUNT/SUM UDF?
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>>
>>
>>
>>
>>
>>
>> At 2020-04-27 17:32:14, "Benchao Li"  wrote:
>>
>> Hi,
>>
>> There is indeed a state for the aggregation result, however we cannot
>> disable it, it's by design.
>> StreamQueryConfig.maxIdleStateRetentionTime can control how long the
>> state will be kept.
>> If you can ensure the time gap between two records of the same id larger
>> than, for example
>> 1 min, then setting retention time to 1min can resolve your issue.
>> If not, maybe you need to change your dimension table, making it return
>> the count directly instead
>> of return the details.
>>
>> izual  于2020年4月27日周一 下午5:06写道:
>>
>>> I implements my DimTable by extends `LookupTableSource`[1], which stores
>>> data like:
>>>
>>> id=1 -> (SH, BJ, SD)
>>>
>>> id=2 -> (...)
>>>
>>> and then extends `TableFunction` to return the value corresponding to
>>> the lookup keys,and maybe return multi rows, for example, when lookupkeys
>>> is id=1, then in the `TableFunction.eval`
>>>
>>> ```
>>>
>>> collect('SH')
>>>
>>> collect('BJ')
>>>
>>> collect('SD')
>>>
>>> ```
>>>
>>>
>>> Now I want to get the region'count by id, which is from the tblEvent.id,
>>> sql is :
>>>
>>>
>>> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
>>> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
>>> tblEvent.id
>>>
>>>
>>> I expect the result of COUNT is always 3 for id = 1, no matter the id=1
>>> appears how many times.
>>>
>>> but the actual result is : 3, 6, 9, ...
>>>
>>>
>>> I think this is bcz the state mechanism behind COUNT, how to turn this
>>> off?
>>>
>>> Or what's the correct use for this?
>>> StreamQueryConfig.maxIdleStateRetentionTime or something?
>>>
>>>
>>> The reason not using state in flink:
>>> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: Flink Lookup Filter Pushdown

2020-04-27 Thread Jark Wu
Hi forideal,

Currently, dimension table join requires at least one join key. That is the
`u_id` in your example. The join key will be used as lookup key.
If you have some additional filters on dimension table, that's fine, Flink
will help to filter the fetched data.
That means Flink supports following dimension join query:

select score_a ... left join ... source_table.u_id=dim_u_score.u_id
where dim_u_score.score_b > 1;


At present, dimension table join doesn't pushdown filters into source, so
if the data associated to the given lookup key is very large, it will have
a high IO cost.
However, filter pushdown into lookup table is in the roadmap.

Best,
Jark







On Mon, 27 Apr 2020 at 20:08, forideal  wrote:

> Hello, my friend.
>
> I have a dimension table.
>
> create table dim_u_score( u_id bigint, varchar, score_a double, score_b 
> double) with {xxx}In a scene
>
> The condition of lookup is fliter score_a > 0.9
>
> In another scenario
>
> The condition of lookup is fliter score_b > 1
>
> In Flink, at present, lookup join can use on to pass key values, such as
>
> select score_a ... left join ... source_table.u_id=dim_u_score.u_id
>
> If so, what should I do?
>
> If not, can I say that I can create multiple tables with conditions to use
> when it comes?
>
> such as
>
> create table dim_u_score_filter_a( u_id bigint, varchar, score_a double, 
> score_b double) with{"filter_condition"="score_a > 0.9 "}create table 
> dim_u_score_filter_b( u_id bigint, varchar, score_a double, score_b 
> double)with {"filter_condition"="fliter score_b > 1 "}
>
> Then, in the process of lookup, push down to the specific execution engine
> to complete the lookup filter.
>
>
>
>


Flink Buildin UDF 性能较慢

2020-04-27 Thread forideal
大家好:


  我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG 等等。
我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)


SQL:


select
  query_nor,
  sum(cast (1asbigint))as query_nor_counter
from ods_search_track
groupby
  query_nor,
  HOP(
event_time,interval'30'SECOND,interval'30'MINUTE)
sum:
public class Sum extends AggregateFunction {

@Override
public boolean isDeterministic() {
return false;
}

@Override
public AtomicLong createAccumulator() {
return new AtomicLong();
}

@Override
public void open(FunctionContext context) throws Exception {

}

@Override
public Long getValue(AtomicLong acc) {
return acc.get();
}

@Override
public TypeInformation getResultType() {
return Types.LONG;
}

public void merge(AtomicLong acc, Iterable it) {
Iterator iter = it.iterator();
while (iter.hasNext()) {
AtomicLong a = iter.next();
acc.addAndGet(a.get());
}
}

public void accumulate(AtomicLong datas, Long data) {
datas.addAndGet(data);
}
}


使用 Flink buildin COUNT


select
  query_nor,
  count(1) as query_nor_counter
from ods_search_track
groupby
  query_nor,
  HOP(
event_time,interval'30'SECOND,interval'30'MINUTE)

Re: windows用户使用pyflink问题

2020-04-27 Thread tao siyuan
好像没效果,我复制了所有site-packages下的包到External Libraries下 但显示的复制路径是site-packages。

Zhefu PENG  于2020年4月28日周二 上午10:16写道:

> 可以尝试在external lib把site-packages下的内容都添加进去,可以帮助提升开发效率。
>
> On Tue, Apr 28, 2020 at 10:13 tao siyuan  wrote:
>
> > 目前,pycharm不支持pyflink开发,请问在windows
> > 系统下,还有其他有效工具或者方式能更加方便得支持pyflink的开发,debugging,源码track吗?
> >
>


Re: windows用户使用pyflink问题

2020-04-27 Thread Wei Zhong
Hi Tao,

PyFlink 的windows支持正在开发中,预计在1.11发布。届时可以解决在windows下开发PyFlink的问题。

> 在 2020年4月28日,10:23,tao siyuan  写道:
> 
> 好的,我试试
> 
> Zhefu PENG  于2020年4月28日周二 上午10:16写道:
> 
>> 可以尝试在external lib把site-packages下的内容都添加进去,可以帮助提升开发效率。
>> 
>> On Tue, Apr 28, 2020 at 10:13 tao siyuan  wrote:
>> 
>>> 目前,pycharm不支持pyflink开发,请问在windows
>>> 系统下,还有其他有效工具或者方式能更加方便得支持pyflink的开发,debugging,源码track吗?
>>> 
>> 



Re: windows用户使用pyflink问题

2020-04-27 Thread tao siyuan
好的,我试试

Zhefu PENG  于2020年4月28日周二 上午10:16写道:

> 可以尝试在external lib把site-packages下的内容都添加进去,可以帮助提升开发效率。
>
> On Tue, Apr 28, 2020 at 10:13 tao siyuan  wrote:
>
> > 目前,pycharm不支持pyflink开发,请问在windows
> > 系统下,还有其他有效工具或者方式能更加方便得支持pyflink的开发,debugging,源码track吗?
> >
>


Re: 关于撤回流的Watermark问题

2020-04-27 Thread Benchao Li
Hi lec,

Window Operator目前是不支持retract的输入的。

lec ssmi  于2020年4月28日周二 上午9:45写道:

> Hi:
>在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢?
>举个例子,
>假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行 time
>
> window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time
> window的时候,因为超过了最大延迟,DELELE记录和INSERT记录都不会被处理吗?
>  谢谢。
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: windows用户使用pyflink问题

2020-04-27 Thread Zhefu PENG
可以尝试在external lib把site-packages下的内容都添加进去,可以帮助提升开发效率。

On Tue, Apr 28, 2020 at 10:13 tao siyuan  wrote:

> 目前,pycharm不支持pyflink开发,请问在windows
> 系统下,还有其他有效工具或者方式能更加方便得支持pyflink的开发,debugging,源码track吗?
>


windows用户使用pyflink问题

2020-04-27 Thread tao siyuan
目前,pycharm不支持pyflink开发,请问在windows
系统下,还有其他有效工具或者方式能更加方便得支持pyflink的开发,debugging,源码track吗?


关于撤回流的Watermark问题

2020-04-27 Thread lec ssmi
Hi:
   在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢?
   举个例子,
   假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行 time
window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time
window的时候,因为超过了最大延迟,DELELE记录和INSERT记录都不会被处理吗?
 谢谢。


flink背压问题

2020-04-27 Thread 阿华田
线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: Flink 1.9.2 why always checkpoint expired

2020-04-27 Thread qq
Hi Jiayi Liao.

  Thanks your replying.   Add attachment . And can’t get any useful messages;

 


> 2020年4月27日 12:40,Jiayi Liao  写道:
> 
> <粘贴的图形-1.tiff>



Re: RocksDB default logging configuration

2020-04-27 Thread Bajaj, Abhinav
It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.
RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);

Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.
So I had to set from the job as above.

I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.
I am using Flink 1.7.1.

Thanks Chesnay for your response below.

~ Abhinav Bajaj

From: Chesnay Schepler 
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" , "user@flink.apache.org" 

Subject: Re: RocksDB default logging configuration

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.

AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.

FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.

The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.


On 23/04/2020 03:24, Bajaj, Abhinav wrote:
Bumping this one again to catch some attention.

From: "Bajaj, Abhinav" 
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org" 

Subject: RocksDB default logging configuration

Hi,

Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068.
It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.

Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.
And it seems there isn’t any related RocksDB 
configuration
 to set in flink-conf.yaml.

Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?

Appreciate the help!

~ Abhinav Bajaj

PS:  Sharing below snippet as desired option if possible -

StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();
stateBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}
});






Re: "Fill in" notification messages based on event time watermark

2020-04-27 Thread David Anderson
Following up on Piotr's outline, there's an example in the documentation of
how to use a KeyedProcessFunction to implement an event-time tumbling
window [1]. Perhaps that can help you get started.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example


On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski  wrote:

> Hi,
>
> I’m not sure, but I don’t think there is an existing window that would do
> exactly what you want. I would suggest to go back to the
> `keyedProcessFunction` (or a custom operator?), and have a
> MapState currentStates field. Your key would
> be for example a timestamp of the beginning of your window. Value would be
> the latest state in this time window, annotated with a timestamp when this
> state was record.
>
> On each element:
>
> 1. you determine the window’s begin ts (key of the map)
> 2. If it’s first element, register an event time timer to publish results
> for that window’s end TS
> 3. look into the `currentStates` if it should be modified (if your new
> element is newer or first value for the given key)
>
> On even time timer firing
> 1. output the state matching to this timer
> 2. Check if there is a (more recent) value for next window, and if not:
>
> 3. copy the value to next window
> 4. Register a timer for this window to fire
>
> 5. Cleanup currentState and remove value for the no longed needed key.
>
> I hope this helps
>
> Piotrek
>
> On 27 Apr 2020, at 12:01, Manas Kale  wrote:
>
> Hi,
> I have an upstream operator that outputs device state transition messages
> with event timestamps. Meaning it only emits output when a transition takes
> place.
> For example,
> state1 @ 1 PM
> state2 @ 2 PM
> and so on.
>
> *Using a downstream operator, I want to emit notification messages as per
> some configured periodicity.* For example, if periodicity = 20 min, in
> the above scenario this operator will output :
> state1 notification @ 1PM
> state1 notification @ 1.20PM
> state1 notification @ 1.40PM
>  ...
>
> *Now the main issue is that I want this to be driven by the watermark and
> not by transition events received from upstream. *Meaning I would like to
> see notification events as soon as the watermark crosses their timestamps;
> *not* when the next transition event arrives at the operator (which could
> be hours later, as above).
>
> My first solution, using a keyedProcessFunction and timers did not work as
> expected because the order in which transition events arrived at this
> operator was non-deterministic. To elaborate, assume a
> setAutoWatermarkInterval of 10 second.
> If we get transition events :
> state1 @ 1sec
> state2 @ 3 sec
> state3 @ 5 sec
> state1 @ 8 sec
> the order in which these events arrived at my keyedProcessFunction was not
> fixed. To solve this, these messages need to be sorted on event time, which
> led me to my second solution.
>
> My second solution, using a EventTimeTumblingWindow with size =
> setAutoWatermarkInterval, also does not work. I sorted accumulated events
> in the window and applied notification-generation logic on them in order.
> However, I assumed that windows are created even if there are no elements.
> Since this is not the case, this solution generates notifications only when
> the next state tranisition message arrives, which could be hours later.
>
> Does anyone have any suggestions on how I can implement this?
> Thanks!
>
>
>
>
>


Re: "Fill in" notification messages based on event time watermark

2020-04-27 Thread Piotr Nowojski
Hi,

I’m not sure, but I don’t think there is an existing window that would do 
exactly what you want. I would suggest to go back to the `keyedProcessFunction` 
(or a custom operator?), and have a MapState 
currentStates field. Your key would be for example a timestamp of the beginning 
of your window. Value would be the latest state in this time window, annotated 
with a timestamp when this state was record.

On each element:

1. you determine the window’s begin ts (key of the map)
2. If it’s first element, register an event time timer to publish results for 
that window’s end TS
3. look into the `currentStates` if it should be modified (if your new element 
is newer or first value for the given key)

On even time timer firing
1. output the state matching to this timer
2. Check if there is a (more recent) value for next window, and if not:
 
3. copy the value to next window
4. Register a timer for this window to fire

5. Cleanup currentState and remove value for the no longed needed key.

I hope this helps

Piotrek 

> On 27 Apr 2020, at 12:01, Manas Kale  wrote:
> 
> Hi,
> I have an upstream operator that outputs device state transition messages 
> with event timestamps. Meaning it only emits output when a transition takes 
> place.
> For example, 
> state1 @ 1 PM
> state2 @ 2 PM
> and so on. 
> 
> Using a downstream operator, I want to emit notification messages as per some 
> configured periodicity. For example, if periodicity = 20 min, in the above 
> scenario this operator will output : 
> state1 notification @ 1PM
> state1 notification @ 1.20PM
> state1 notification @ 1.40PM
>  ...
> 
> Now the main issue is that I want this to be driven by the watermark and not 
> by transition events received from upstream. Meaning I would like to see 
> notification events as soon as the watermark crosses their timestamps; not 
> when the next transition event arrives at the operator (which could be hours 
> later, as above).
> 
> My first solution, using a keyedProcessFunction and timers did not work as 
> expected because the order in which transition events arrived at this 
> operator was non-deterministic. To elaborate, assume a 
> setAutoWatermarkInterval of 10 second.
> If we get transition events :
> state1 @ 1sec
> state2 @ 3 sec
> state3 @ 5 sec
> state1 @ 8 sec
> the order in which these events arrived at my keyedProcessFunction was not 
> fixed. To solve this, these messages need to be sorted on event time, which 
> led me to my second solution.
> 
> My second solution, using a EventTimeTumblingWindow with size = 
> setAutoWatermarkInterval, also does not work. I sorted accumulated events in 
> the window and applied notification-generation logic on them in order. 
> However, I assumed that windows are created even if there are no elements. 
> Since this is not the case, this solution generates notifications only when 
> the next state tranisition message arrives, which could be hours later.
> 
> Does anyone have any suggestions on how I can implement this?
> Thanks!
> 
> 
> 



Re: Task Assignment

2020-04-27 Thread Piotr Nowojski
Hi Navneeth,

But what’s the problem with using `keyBy(…)`? If you have a set of keys that 
you want to process together, in other words they are are basically equal from 
the `keyBy(…)` perspective, why can’t you use this in your `KeySelector`?

Maybe to make it clear, you can think about this in two steps. You have the 
sets of keys that you want to processed together, S_1, S_2, …, S_n. Each S_i 
can contain multiple keys. The steps would:
1. You could create an artificial field, index of the set, and add it to your 
record by using some mapping function.
2. You can keyBy records using this index
After this, operator after keyBy will be receiving only keys from one of the 
sets.

(Those two operations could be done also as a single step inside `KeySelector`)

Piotrek  

> On 27 Apr 2020, at 09:28, Marta Paes Moreira  > wrote:
> 
> Sorry — I didn't understand you were dealing with multiple keys. 
> 
> In that case, I'd recommend you read about key-group assignment [1] and check 
> the KeyGroupRangeAssignment class [2]. 
> 
> Key-groups are assigned to parallel tasks as ranges before the job is started 
> — this is also a well-defined behaviour in Flink, with implications in state 
> reassignment on rescaling. I'm afraid that if you try to hardwire this 
> behaviour into your code, the job might not be transparently rescalable 
> anymore.
> 
> [1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html 
> 
> [2] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>  
> 
>  
> 
> On Fri, Apr 24, 2020 at 7:10 AM Navneeth Krishnan  > wrote:
> Hi Marta,
> 
> Thanks for you response. What I'm looking for is something like data 
> localization. If I have one TM which is processing a set of keys, I want to 
> ensure all keys of the same type goes to the same TM rather than using 
> hashing to find the downstream slot. I could use a common key to do this but 
> I would have to parallelize as much as possible since the number of incoming 
> messages is too large to narrow down to a single key and processing it.
> 
> Thanks
> 
> On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira  > wrote:
> Hi, Navneeth.
> 
> If you key your stream using stream.keyBy(…), this will logically split your 
> input and all the records with the same key will be processed in the same 
> operator instance. This is the default behavior in Flink for keyed streams 
> and transparently handled.
> 
> You can read more about it in the documentation [1].
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state
>  
> 
> On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan  > wrote:
> Hi All,
> 
> Is there a way for an upstream operator to know how the downstream operator 
> tasks are assigned? Basically I want to group my messages to be processed on 
> slots in the same node based on some key.
> 
> Thanks



Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-04-27 Thread aj
Hello Yang,
My Hadoop version is Hadoop 3.2.1-amzn-0
and I have put in flink/lib.   flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

then I am getting below error :

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1587983834922_0002/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.IllegalArgumentException: Invalid
rule: /L
  RULE:[2:$1@$0](.*@)s/@.*///L
  DEFAULT
at
org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
at
org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at
org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)


if I remove the  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  from lib  then
i get below error:

2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
-  Classpath:
/usr/lib/flink/lib/flink-table-blink_2.11-1.10.0.jar:/usr/lib/flink/lib/flink-table_2.11-1.10.0.jar:/usr/lib/flink/lib/log4j-1.2.17.jar:/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/lib/flink/lib/flink-dist_2.11-1.10.0.jar::/etc/hadoop/conf:/etc/hadoop/conf
2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
-

2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.process.size, 1568m
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: env.yarn.conf.dir, /etc/hadoop/conf
2020-04-27 16:59:37,300 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
2020-04-27 16:59:37,301 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-04-27 16:59:37,301 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: classloader.resolve-order, parent-first
2020-04-27 16:59:37,301 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: s3.access-key, AKIA52DD5QA5FC7HPKXG
2020-04-27 16:59:37,301 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: s3.secret-key, **
2020-04-27 16:59:37,305 WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1076)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1030)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:957)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at 

Re: Updating Closure Variables

2020-04-27 Thread Yun Gao
 Hi Senthil,
 I think you are right that you cannot update closure variables 
directly and expect them to show up at the workers.

 If the variable values are read from S3 files, I think currently you 
will need to define a source explicitly to read the latest value of the file. 
Whether to use BroadcastedStream should depends on how you want to access the 
set of string: if you want to broadcast the same strings to all the tasks, then 
broadcast stream is the solution and if you want to distribute the set of 
strings in other methods, you could also use more generic connect streams like: 
 streamA.connect(streamB.keyBy()).process(xx). [1]

Best,
 Yun

 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/#datastream-transformations



--
From:Senthil Kumar 
Send Time:2020 Apr. 27 (Mon.) 21:51
To:user@flink.apache.org 
Subject:Updating Closure Variables

Hello Flink Community!

We have a flink streaming application with a particular use case where a 
closure variable Set is used in a filter function.

Currently, the variable is set at startup time.

It’s populated from an S3 location, where several files exist (we consume the 
one with the last updated timestamp).

Is it possible to periodically update (say once every 24 hours) this closure 
variable?

My initial research indicates that we cannot update closure variables and 
expect them to show up at the workers.

There seems to be something called BrodcastStream in Flink. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Is that the right approach? I would like some kind of a confirmation before I 
go deeper into it.

cheers
Kumar



RE: History Server Not Showing Any Jobs - File Not Found?

2020-04-27 Thread Hailu, Andreas
bash-4.1$ ls -l /local/scratch/flink_historyserver_tmpdir/
total 8
drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:43 
flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9
drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:22 
flink-web-history-95b3f928-c60f-4351-9926-766c6ad3ee76

There are just two directories in here. I don't see cache directories from my 
attempts today, which is interesting. Looking a little deeper into them:

bash-4.1$ ls -lr 
/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9
total 1756
drwxrwxr-x 2 p2epdlsuat p2epdlsuat 1789952 Apr 21 10:44 jobs
bash-4.1$ ls -lr 
/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9/jobs
total 0
-rw-rw-r-- 1 p2epdlsuat p2epdlsuat 0 Apr 21 10:43 overview.json

There are indeed archives already in HDFS - I've included some in my initial 
mail, but here they are again just for reference:
-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs
Found 44282 items
-rw-r-   3 delp datalake_admin_dev  50569 2020-03-21 23:17 
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936
-rw-r-   3 delp datalake_admin_dev  49578 2020-03-03 08:45 
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5
-rw-r-   3 delp datalake_admin_dev  50842 2020-03-24 15:19 
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757
...


// ah

From: Chesnay Schepler 
Sent: Monday, April 27, 2020 10:28 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

If historyserver.web.tmpdir is not set then java.io.tmpdir is used, so that 
should be fine.

What are the contents of /local/scratch/flink_historyserver_tmpdir?
I assume there are already archives in HDFS?

On 27/04/2020 16:02, Hailu, Andreas wrote:
My machine's /tmp directory is not large enough to support the archived files, 
so I changed my java.io.tmpdir to be in some other location which is 
significantly larger. I hadn't set anything for historyserver.web.tmpdir, so I 
suspect it was still pointing at /tmp. I just tried setting 
historyserver.web.tmpdir to the same location as my java.io.tmpdir location, 
but I'm afraid I'm still seeing the following issue:

2020-04-27 09:37:42,904 [nioEventLoopGroup-3-4] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/overview.json from classloader
2020-04-27 09:37:42,906 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader

flink-conf.yaml for reference:
jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
historyserver.web.tmpdir: /local/scratch/flink_historyserver_tmpdir/

Did you have anything else in mind when you said pointing somewhere funny?

// ah

From: Chesnay Schepler 
Sent: Monday, April 27, 2020 5:56 AM
To: Hailu, Andreas [Engineering] 
; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?


overview.json is a generated file that is placed in the local directory 
controlled by historyserver.web.tmpdir.

Have you configured this option to point to some non-local filesystem? (Or if 
not, is the java.io.tmpdir property pointing somewhere funny?)
On 24/04/2020 18:24, Hailu, Andreas wrote:
I'm having a further look at the code in HistoryServerStaticFileServerHandler - 
is there an assumption about where overview.json is supposed to be located?

// ah

From: Hailu, Andreas [Engineering]
Sent: Wednesday, April 22, 2020 1:32 PM
To: 'Chesnay Schepler' ; Hailu, 
Andreas [Engineering] 
; 
user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We're using Flink 1.9.1. I enabled DEBUG 
level logging and this is something relevant I see:

2020-04-22 13:25:52,566 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - Connecting to datanode 10.79.252.101:1019
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, 
remoteHostTrusted = false
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL client skipping handshake in secured 
configuration with privileged port for addr = /10.79.252.101, datanodeId = 
DatanodeI
nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]
2020-04-22 13:25:52,571 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - DFSInputStream has been closed already
2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG 

Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Dawid Wysakowicz
What I meant by "Unfortunately you can not reorder the fields that way."
is that

   tableEnv.fromDataStream(input, “name, age, height");

uses the so-called referenceByPosition mode. It will name the f0 field
-> name, the f1 -> age and f2 -> height.


If it wasn't for the bug you could reorder and rename at the same time:

   tableEnv.fromDataStream(input, “f1 as name, f2 as age, f0 as height")
// it reorders the fields of the pojo to the order f1,f2,f0 and give
them aliases

With a fix it should be possible yes.

Best,

Dawid


On 27/04/2020 17:24, Gyula Fóra wrote:
> Hi Dawid,
>
> Thanks for the clarification on this issue and I agree that there is
> too much going on with these conversions already.
>
> What do you mean by "Unfortunately you can not reorder the fields that
> way." ?
> I can reorder POJO fields even after aliasing and also tuple fields
> (f1, f0) so I assume reordering will still work if tuple and row
> aliasing is fixed.
>
> I will open a JIRA for this!
>
> Thanks!
> Gyula
>
> On Mon, Apr 27, 2020 at 4:58 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hi Gyula,
>
> I think you are hitting a bug with the naming/aliasing of the
> fields of a Tuple. The bug is in the
> org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition
> method. As it does not work correctly for aliases. Would you mind
> creating an issue for it?
>
> You should be able to alias the fields as follows:
>
> tableEnv.fromDataStream(input, “name, age, height");
>
> Unfortunately you can not reorder the fields that way.
>
> If you want to flatten/extract nested fields you should be able to
> do that in a subsequent operation. The method fromDataStream is
> supposed to register the entire DataStream as a Table and it does
> not support projections etc.
>
> tableEnv.fromDataStream(input, “name, age, height")
>
> .select("name.f0 as nameF0, age.flatten, ...");
>
> Side note. In my opinion this method (fromDataStream(DataStream,
> Expression/String... fields)) has already too many
> responsibilities and is hard to understand. (You can reorder
> fields, rename fields without alias, rename fields with an alias,
> alias works differently depending of the available fields or type
> etc.). In the long term I'd prefer to come up with a better way of
> creating a Table out of a DataStream.
>
> BTW The way we can fix the renaming + reordering is by changing
> the method I mentioned:
>
>     public static boolean isReferenceByPosition(CompositeType
> ct, Expression[] fields) {
>         if (!(ct instanceof TupleTypeInfoBase)) {
>             return false;
>         }
>
>         List inputNames = Arrays.asList(ct.getFieldNames());
>
>         // Use the by-position mode if no of the fields exists in
> the input.
>         // This prevents confusing cases like ('f2, 'f0, 'myName)
> for a Tuple3 where fields are renamed
>         // by position but the user might assume reordering
> instead of renaming.
>         return Arrays.stream(fields).allMatch(f -> {
> *            if (f instanceof UnresolvedCallExpression &&**
> **                    ((UnresolvedCallExpression)
> f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&**
> **                    f.getChildren().get(0) instanceof
> UnresolvedReferenceExpression) {**
> **                return false;**
> **            }*
>
>             if (f instanceof UnresolvedReferenceExpression) {
>                 return
> !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
>             }
>
>             return true;
>         });
>     }
>
>
> Best,
>
> Dawid
>
>
> On 27/04/2020 15:57, Gyula Fóra wrote:
>> Hi Leonard,
>>
>> The tuple fields can also be referenced as their POJO names (f0,
>> f1), they can be reordered similar to pojo fields, however you
>> cannot alias them. (If you look at the link I have sent that
>> shows how it is supposed to work but it throws an exception when
>> I try it)
>> Also what I am trying to do at the end is to flatten a nested tuple:
>>
>> Tuple2> -> into 3 columns, lets
>> say name, age, height
>>
>> Normally I would write this: tableEnv.fromDataStream(input, “f0
>> as name, f1.f0 as age, f1.f1 as height");
>> However this doesnt work and there seem to be no way to assign
>> names to the nested tuple columns anyways.
>>
>> For Pojo aliasing works  but still I cannot find a way to unnest
>> a nested object:
>>
>> public static class Person {
>>   public String name;
>>   public public Tuple2 details;
>> }
>>
>> tableEnv.fromDataStream(persons, "name, details.f0 as age,
>> details.f1 as height")
>>
>> this leads to an error: 
>> Field reference expression or alias on field expression expected.
>>
>> 

Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread Benchao Li
Hi izual,

IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and
your UDAF's aggregator will be handled by operator using state.

izual  于2020年4月27日周一 下午11:21写道:

> Thanks, Benchao.
>
> Maybe change the dimension table will work, but this changes a lot,
> include `size/count` is not the column of one dim table.
> I notice that user can define Aggregate Functions[1],  but this page also
> said:
> > Accumulators are automatically backup-ed by Flink’s checkpointing
> mechanism and restored
> So is it right to implement my own COUNT/SUM UDF?
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>
>
>
>
>
>
> At 2020-04-27 17:32:14, "Benchao Li"  wrote:
>
> Hi,
>
> There is indeed a state for the aggregation result, however we cannot
> disable it, it's by design.
> StreamQueryConfig.maxIdleStateRetentionTime can control how long the state
> will be kept.
> If you can ensure the time gap between two records of the same id larger
> than, for example
> 1 min, then setting retention time to 1min can resolve your issue.
> If not, maybe you need to change your dimension table, making it return
> the count directly instead
> of return the details.
>
> izual  于2020年4月27日周一 下午5:06写道:
>
>> I implements my DimTable by extends `LookupTableSource`[1], which stores
>> data like:
>>
>> id=1 -> (SH, BJ, SD)
>>
>> id=2 -> (...)
>>
>> and then extends `TableFunction` to return the value corresponding to the
>> lookup keys,and maybe return multi rows, for example, when lookupkeys is
>> id=1, then in the `TableFunction.eval`
>>
>> ```
>>
>> collect('SH')
>>
>> collect('BJ')
>>
>> collect('SD')
>>
>> ```
>>
>>
>> Now I want to get the region'count by id, which is from the tblEvent.id,
>> sql is :
>>
>>
>> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
>> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
>> tblEvent.id
>>
>>
>> I expect the result of COUNT is always 3 for id = 1, no matter the id=1
>> appears how many times.
>>
>> but the actual result is : 3, 6, 9, ...
>>
>>
>> I think this is bcz the state mechanism behind COUNT, how to turn this
>> off?
>>
>> Or what's the correct use for this?
>> StreamQueryConfig.maxIdleStateRetentionTime or something?
>>
>>
>> The reason not using state in flink:
>> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Gyula Fóra
Hi Dawid,

Thanks for the clarification on this issue and I agree that there is too
much going on with these conversions already.

What do you mean by "Unfortunately you can not reorder the fields that
way." ?
I can reorder POJO fields even after aliasing and also tuple fields (f1,
f0) so I assume reordering will still work if tuple and row aliasing is
fixed.

I will open a JIRA for this!

Thanks!
Gyula

On Mon, Apr 27, 2020 at 4:58 PM Dawid Wysakowicz 
wrote:

> Hi Gyula,
>
> I think you are hitting a bug with the naming/aliasing of the fields of a
> Tuple. The bug is in the
> org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition
> method. As it does not work correctly for aliases. Would you mind creating
> an issue for it?
>
> You should be able to alias the fields as follows:
>
> tableEnv.fromDataStream(input, “name, age, height");
>
> Unfortunately you can not reorder the fields that way.
>
> If you want to flatten/extract nested fields you should be able to do that
> in a subsequent operation. The method fromDataStream is supposed to
> register the entire DataStream as a Table and it does not support
> projections etc.
>
> tableEnv.fromDataStream(input, “name, age, height")
>
> .select("name.f0 as nameF0, age.flatten, ...");
>
> Side note. In my opinion this method (fromDataStream(DataStream,
> Expression/String... fields)) has already too many responsibilities and is
> hard to understand. (You can reorder fields, rename fields without alias,
> rename fields with an alias, alias works differently depending of the
> available fields or type etc.). In the long term I'd prefer to come up with
> a better way of creating a Table out of a DataStream.
>
> BTW The way we can fix the renaming + reordering is by changing the method
> I mentioned:
>
> public static boolean isReferenceByPosition(CompositeType ct,
> Expression[] fields) {
> if (!(ct instanceof TupleTypeInfoBase)) {
> return false;
> }
>
> List inputNames = Arrays.asList(ct.getFieldNames());
>
> // Use the by-position mode if no of the fields exists in the
> input.
> // This prevents confusing cases like ('f2, 'f0, 'myName) for a
> Tuple3 where fields are renamed
> // by position but the user might assume reordering instead of
> renaming.
> return Arrays.stream(fields).allMatch(f -> {
> *if (f instanceof UnresolvedCallExpression &&*
> *((UnresolvedCallExpression)
> f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&*
> *f.getChildren().get(0) instanceof
> UnresolvedReferenceExpression) {*
> *return false;*
> *}*
>
> if (f instanceof UnresolvedReferenceExpression) {
> return
> !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
> }
>
> return true;
> });
> }
>
>
> Best,
>
> Dawid
>
>
> On 27/04/2020 15:57, Gyula Fóra wrote:
>
> Hi Leonard,
>
> The tuple fields can also be referenced as their POJO names (f0, f1), they
> can be reordered similar to pojo fields, however you cannot alias them. (If
> you look at the link I have sent that shows how it is supposed to work but
> it throws an exception when I try it)
> Also what I am trying to do at the end is to flatten a nested tuple:
>
> Tuple2> -> into 3 columns, lets say name,
> age, height
>
> Normally I would write this: tableEnv.fromDataStream(input, “f0 as name,
> f1.f0 as age, f1.f1 as height");
> However this doesnt work and there seem to be no way to assign names to
> the nested tuple columns anyways.
>
> For Pojo aliasing works  but still I cannot find a way to unnest a nested
> object:
>
> public static class Person {
>   public String name;
>   public public Tuple2 details;
> }
>
> tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as
> height")
>
> this leads to an error:
> Field reference expression or alias on field expression expected.
>
> Aliasing fields also doesn't work when converting from Row stream even if
> the column names are provided in the type info.
>
> Cheers,
> Gyula
>
> On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu  wrote:
>
>> Hi,  gyula.fora
>>
>> If you’re trying convert Table from a Tuple DataStream, Alias the filed
>> by `as` expression is no supported yet,
>> because all fields are referenced by position in this point. You can
>> simply alias like following syntax:
>> ```
>> tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
>> ```
>> This should satisfy  your purpose. And back to the 1.10 docs, If you are
>> converting Table from a
>> POJO(assuming the POJO person has two fields name and age) DataStream,
>> Alias the filed by `as` is supported
>> because this point all fields are referenced by name, like:
>> ```
>> tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as
>> age_alias, name as user_name,");
>> ```
>>
>>
>> Best,
>> Leonard, Xu
>
>


Re:Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
Thanks, Benchao.


Maybe change the dimension table will work, but this changes a lot, include 
`size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism 
> and restored
So is it right to implement my own COUNT/SUM UDF?


[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions


















At 2020-04-27 17:32:14, "Benchao Li"  wrote:

Hi,


There is indeed a state for the aggregation result, however we cannot disable 
it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will 
be kept.
If you can ensure the time gap between two records of the same id larger than, 
for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the 
count directly instead 
of return the details.


izual  于2020年4月27日周一 下午5:06写道:


I implements my DimTable by extends `LookupTableSource`[1], which stores data 
like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the 
lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, 
then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```




Now I want to get the region'count by id, which is from the tblEvent.id, sql is 
:




SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM 
AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id




I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears 
how many times.

but the actual result is : 3, 6, 9, ...




I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime 
or something?




The reason not using state in flink: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups





 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Dawid Wysakowicz
Hi Gyula,

I think you are hitting a bug with the naming/aliasing of the fields of
a Tuple. The bug is in the
org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition
method. As it does not work correctly for aliases. Would you mind
creating an issue for it?

You should be able to alias the fields as follows:

tableEnv.fromDataStream(input, “name, age, height");

Unfortunately you can not reorder the fields that way.

If you want to flatten/extract nested fields you should be able to do
that in a subsequent operation. The method fromDataStream is supposed to
register the entire DataStream as a Table and it does not support
projections etc.

tableEnv.fromDataStream(input, “name, age, height")

.select("name.f0 as nameF0, age.flatten, ...");

Side note. In my opinion this method (fromDataStream(DataStream,
Expression/String... fields)) has already too many responsibilities and
is hard to understand. (You can reorder fields, rename fields without
alias, rename fields with an alias, alias works differently depending of
the available fields or type etc.). In the long term I'd prefer to come
up with a better way of creating a Table out of a DataStream.

BTW The way we can fix the renaming + reordering is by changing the
method I mentioned:

    public static boolean isReferenceByPosition(CompositeType ct,
Expression[] fields) {
        if (!(ct instanceof TupleTypeInfoBase)) {
            return false;
        }

        List inputNames = Arrays.asList(ct.getFieldNames());

        // Use the by-position mode if no of the fields exists in the input.
        // This prevents confusing cases like ('f2, 'f0, 'myName) for a
Tuple3 where fields are renamed
        // by position but the user might assume reordering instead of
renaming.
        return Arrays.stream(fields).allMatch(f -> {
*            if (f instanceof UnresolvedCallExpression &&**
**                    ((UnresolvedCallExpression)
f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&**
**                    f.getChildren().get(0) instanceof
UnresolvedReferenceExpression) {**
**                return false;**
**            }*

            if (f instanceof UnresolvedReferenceExpression) {
                return
!inputNames.contains(((UnresolvedReferenceExpression) f).getName());
            }

            return true;
        });
    }


Best,

Dawid


On 27/04/2020 15:57, Gyula Fóra wrote:
> Hi Leonard,
>
> The tuple fields can also be referenced as their POJO names (f0, f1),
> they can be reordered similar to pojo fields, however you cannot alias
> them. (If you look at the link I have sent that shows how it is
> supposed to work but it throws an exception when I try it)
> Also what I am trying to do at the end is to flatten a nested tuple:
>
> Tuple2> -> into 3 columns, lets say
> name, age, height
>
> Normally I would write this: tableEnv.fromDataStream(input, “f0 as
> name, f1.f0 as age, f1.f1 as height");
> However this doesnt work and there seem to be no way to assign names
> to the nested tuple columns anyways.
>
> For Pojo aliasing works  but still I cannot find a way to unnest a
> nested object:
>
> public static class Person {
>   public String name;
>   public public Tuple2 details;
> }
>
> tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1
> as height")
>
> this leads to an error: 
> Field reference expression or alias on field expression expected.
>
> Aliasing fields also doesn't work when converting from Row stream even
> if the column names are provided in the type info.
>
> Cheers,
> Gyula
>
> On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu  > wrote:
>
> Hi,  gyula.fora
>
> If you’re trying convert Table from a Tuple DataStream, Alias the
> filed by `as` expression is no supported yet,
> because all fields are referenced by position in this point. You
> can simply alias like following syntax:
> ```
> tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)),
> “name, age");
> ```
> This should satisfy  your purpose. And back to the 1.10 docs, If
> you are converting Table from a
> POJO(assuming the POJO person has two fields name and age)
> DataStream, Alias the filed by `as` is supported
> because this point all fields are referenced by name, like:
> ```
> tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)),
> “age as age_alias, name as user_name,");
> ```
>
>
> Best,
> Leonard, Xu
>


signature.asc
Description: OpenPGP digital signature


Re: History Server Not Showing Any Jobs - File Not Found?

2020-04-27 Thread Chesnay Schepler
If historyserver.web.tmpdir is not set then java.io.tmpdir is used, so 
that should be fine.


What are the contents of /local/scratch/flink_historyserver_tmpdir?
I assume there are already archives in HDFS?

On 27/04/2020 16:02, Hailu, Andreas wrote:


My machine’s /tmp directory is not large enough to support the 
archived files, so I changed my java.io.tmpdir to be in some other 
location which is significantly larger. I hadn’t set anything for 
historyserver.web.tmpdir, so I suspect it was still pointing at /tmp. 
I just tried setting historyserver.web.tmpdir to the same location as 
my java.io.tmpdir location, but I’m afraid I’m still seeing the 
following issue:


2020-04-27 09:37:42,904 [nioEventLoopGroup-3-4] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/overview.json from classloader


2020-04-27 09:37:42,906 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader


flink-conf.yaml for reference:

jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/

historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/

historyserver.web.tmpdir: /local/scratch/flink_historyserver_tmpdir/

Did you have anything else in mind when you said pointing somewhere funny?

*// *ah**

*From:*Chesnay Schepler 
*Sent:* Monday, April 27, 2020 5:56 AM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

overview.json is a generated file that is placed in the local 
directory controlled by /historyserver.web.tmpdir/.


Have you configured this option to point to some non-local filesystem? 
(Or if not, is the java.io.tmpdir property pointing somewhere funny?)


On 24/04/2020 18:24, Hailu, Andreas wrote:

I’m having a further look at the code in
HistoryServerStaticFileServerHandler - is there an assumption
about where overview.json is supposed to be located?

*// *ah

*From:*Hailu, Andreas [Engineering]
*Sent:* Wednesday, April 22, 2020 1:32 PM
*To:* 'Chesnay Schepler' 
; Hailu, Andreas [Engineering]

; user@flink.apache.org

*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We’re using Flink 1.9.1. I
enabled DEBUG level logging and this is something relevant I see:

2020-04-22 13:25:52,566
[Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG DFSInputStream
- Connecting to datanode 10.79.252.101:1019

2020-04-22 13:25:52,567
[Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG
SaslDataTransferClient - SASL encryption trust check:
localHostTrusted = false, remoteHostTrusted = false

2020-04-22 13:25:52,567
[Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG
SaslDataTransferClient - SASL client skipping handshake in secured
configuration with privileged port for addr = /10.79.252.101,
datanodeId = DatanodeI


nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]

*2020-04-22 13:25:52,571
[Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG DFSInputStream
- DFSInputStream has been closed already*

*2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG
HistoryServerStaticFileServerHandler - Unable to load requested
file /jobs/overview.json from classloader*

2020-04-22 13:25:52,576 [IPC Parameter Sending Thread #0] DEBUG
Client$Connection$3 - IPC Client (1578587450) connection to
d279536-002.dc.gs.com/10.59.61.87:8020 from d...@gs.com
 sending #1391

Aside from that, it looks like a lot of logging around datanodes
and block location metadata. Did I miss something in my classpath,
perhaps? If so, do you have a suggestion on what I could try?

*// *ah

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Wednesday, April 22, 2020 2:16 AM
*To:* Hailu, Andreas [Engineering] mailto:andreas.ha...@ny.email.gs.com>>; user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

Which Flink version are you using?

Have you checked the history server logs after enabling debug logging?

On 21/04/2020 17:16, Hailu, Andreas [Engineering] wrote:

Hi,

I’m trying to set up the History Server, but none of my
applications are showing up in the Web UI. Looking at the
console, I see that all of the calls to /overview return the
following 404 response: {"errors":["File not found."]}.

I’ve set up my configuration as follows:

JobManager Archive directory:

*jobmanager.archive.fs.dir*:
hdfs:///user/p2epda/lake/delp_qa/flink_hs/

-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs

Found 

Re: Flink 1.9.2 why always checkpoint expired

2020-04-27 Thread Congxian Qiu
Hi
The image is not very clear.
For RocksDBStateBackend, do you enable incremental checkpoint?

Currently, checkpoint on TM side contains some steps:
1 barrier align
2 sync snapshot
3 async snapshot

For expired checkpoint, could you please check the tasks in the first
operator of the DAG to find out why it timed out.
- is there any backpressure? (affect barrier align)
- is the disk util/network util is high? (affect step 2&3)
- is the task thread is too busy? (this can lead to the barrier processed
sometime late)

you can enable the debug log to find out more info.

Best,
Congxian


qq <471237...@qq.com> 于2020年4月27日周一 下午12:34写道:

> Hi all,
>
> Why my flink checkpoint always expired, I used RocksDB checkpoint,
> and I can’t get any useful messages for this. Could you help me ? Thanks
> very much.
>
>
>
>


RE: History Server Not Showing Any Jobs - File Not Found?

2020-04-27 Thread Hailu, Andreas
My machine's /tmp directory is not large enough to support the archived files, 
so I changed my java.io.tmpdir to be in some other location which is 
significantly larger. I hadn't set anything for historyserver.web.tmpdir, so I 
suspect it was still pointing at /tmp. I just tried setting 
historyserver.web.tmpdir to the same location as my java.io.tmpdir location, 
but I'm afraid I'm still seeing the following issue:

2020-04-27 09:37:42,904 [nioEventLoopGroup-3-4] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/overview.json from classloader
2020-04-27 09:37:42,906 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader

flink-conf.yaml for reference:
jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
historyserver.web.tmpdir: /local/scratch/flink_historyserver_tmpdir/

Did you have anything else in mind when you said pointing somewhere funny?

// ah

From: Chesnay Schepler 
Sent: Monday, April 27, 2020 5:56 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?


overview.json is a generated file that is placed in the local directory 
controlled by historyserver.web.tmpdir.

Have you configured this option to point to some non-local filesystem? (Or if 
not, is the java.io.tmpdir property pointing somewhere funny?)
On 24/04/2020 18:24, Hailu, Andreas wrote:
I'm having a further look at the code in HistoryServerStaticFileServerHandler - 
is there an assumption about where overview.json is supposed to be located?

// ah

From: Hailu, Andreas [Engineering]
Sent: Wednesday, April 22, 2020 1:32 PM
To: 'Chesnay Schepler' ; Hailu, 
Andreas [Engineering] 
; 
user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We're using Flink 1.9.1. I enabled DEBUG 
level logging and this is something relevant I see:

2020-04-22 13:25:52,566 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - Connecting to datanode 10.79.252.101:1019
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, 
remoteHostTrusted = false
2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
SaslDataTransferClient - SASL client skipping handshake in secured 
configuration with privileged port for addr = /10.79.252.101, datanodeId = 
DatanodeI
nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]
2020-04-22 13:25:52,571 [Flink-HistoryServer-ArchiveFetcher-thread-1] DEBUG 
DFSInputStream - DFSInputStream has been closed already
2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader
2020-04-22 13:25:52,576 [IPC Parameter Sending Thread #0] DEBUG 
Client$Connection$3 - IPC Client (1578587450) connection to 
d279536-002.dc.gs.com/10.59.61.87:8020 from d...@gs.com 
sending #1391

Aside from that, it looks like a lot of logging around datanodes and block 
location metadata. Did I miss something in my classpath, perhaps? If so, do you 
have a suggestion on what I could try?

// ah

From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Wednesday, April 22, 2020 2:16 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

Which Flink version are you using?
Have you checked the history server logs after enabling debug logging?

On 21/04/2020 17:16, Hailu, Andreas [Engineering] wrote:
Hi,

I'm trying to set up the History Server, but none of my applications are 
showing up in the Web UI. Looking at the console, I see that all of the calls 
to /overview return the following 404 response: {"errors":["File not found."]}.

I've set up my configuration as follows:

JobManager Archive directory:
jobmanager.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/
-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs
Found 44282 items
-rw-r-   3 delp datalake_admin_dev  50569 2020-03-21 23:17 
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936
-rw-r-   3 delp datalake_admin_dev  49578 2020-03-03 08:45 
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5
-rw-r-   3 delp datalake_admin_dev  50842 2020-03-24 15:19 
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757
...
...

History Server will fetch the archived jobs from the same location:
historyserver.archive.fs.dir: hdfs:///user/p2epda/lake/delp_qa/flink_hs/

So I'm able 

Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Gyula Fóra
Hi Leonard,

The tuple fields can also be referenced as their POJO names (f0, f1), they
can be reordered similar to pojo fields, however you cannot alias them. (If
you look at the link I have sent that shows how it is supposed to work but
it throws an exception when I try it)
Also what I am trying to do at the end is to flatten a nested tuple:

Tuple2> -> into 3 columns, lets say name,
age, height

Normally I would write this: tableEnv.fromDataStream(input, “f0 as name,
f1.f0 as age, f1.f1 as height");
However this doesnt work and there seem to be no way to assign names to the
nested tuple columns anyways.

For Pojo aliasing works  but still I cannot find a way to unnest a nested
object:

public static class Person {
  public String name;
  public public Tuple2 details;
}

tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as
height")

this leads to an error:
Field reference expression or alias on field expression expected.

Aliasing fields also doesn't work when converting from Row stream even if
the column names are provided in the type info.

Cheers,
Gyula

On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu  wrote:

> Hi,  gyula.fora
>
> If you’re trying convert Table from a Tuple DataStream, Alias the filed by
> `as` expression is no supported yet,
> because all fields are referenced by position in this point. You can
> simply alias like following syntax:
> ```
> tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
> ```
> This should satisfy  your purpose. And back to the 1.10 docs, If you are
> converting Table from a
> POJO(assuming the POJO person has two fields name and age) DataStream,
> Alias the filed by `as` is supported
> because this point all fields are referenced by name, like:
> ```
> tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as
> age_alias, name as user_name,");
> ```
>
>
> Best,
> Leonard, Xu


Updating Closure Variables

2020-04-27 Thread Senthil Kumar
Hello Flink Community!

We have a flink streaming application with a particular use case where a 
closure variable Set is used in a filter function.

Currently, the variable is set at startup time.

It’s populated from an S3 location, where several files exist (we consume the 
one with the last updated timestamp).

Is it possible to periodically update (say once every 24 hours) this closure 
variable?

My initial research indicates that we cannot update closure variables and 
expect them to show up at the workers.

There seems to be something called BrodcastStream in Flink. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Is that the right approach? I would like some kind of a confirmation before I 
go deeper into it.

cheers
Kumar


Re: Cannot map nested Tuple fields to table columns

2020-04-27 Thread Leonard Xu
Hi,  gyula.fora

If you’re trying convert Table from a Tuple DataStream, Alias the filed by `as` 
expression is no supported yet,
because all fields are referenced by position in this point. You can simply 
alias like following syntax:
```
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
```
This should satisfy  your purpose. And back to the 1.10 docs, If you are 
converting Table from a 
POJO(assuming the POJO person has two fields name and age) DataStream, Alias 
the filed by `as` is supported 
because this point all fields are referenced by name, like:
```
tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as 
age_alias, name as user_name,");
```


Best,
Leonard, Xu

Re: Streaming Job eventually begins failing during checkpointing

2020-04-27 Thread Yu Li
Sorry, just noticed this thread...

@Stephan I cannot remember the discussion but I think it's an interesting
topic, will find some time to consider it (unregister states).

@Eleanore Glad to know that Beam community has fixed it and thanks for the
reference.

Best Regards,
Yu


On Sun, 26 Apr 2020 at 03:10, Eleanore Jin  wrote:

> Hi All,
>
> I think the Beam Community fixed this issue:
> https://github.com/apache/beam/pull/11478
>
> Thanks!
> Eleanore
>
> On Thu, Apr 23, 2020 at 4:24 AM Stephan Ewen  wrote:
>
>> If something requires Beam to register a new state each time, then this
>> is tricky, because currently you cannot unregister states from Flink.
>>
>> @Yu @Yun I remember chatting about this (allowing to explicitly
>> unregister states so they get dropped from successive checkpoints) at some
>> point, but I could not find a jira ticket for this. Do you remember what
>> the status of that discussion is?
>>
>> On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel  wrote:
>>
>>> I posted to the beam mailing list:
>>> https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E
>>>
>>> I think this is related to a Beam feature called RequiresStableInput
>>> (which my pipeline is using).  It will create a new operator (or keyed)
>>> state per checkpoint.  I'm not sure that there are any parameters that I
>>> have control over to tweak it's behavior (apart from increasing the
>>> checkpoint interval to let the pipeline run longer before building up that
>>> many states).
>>>
>>> Perhaps this is something that can be fixed (maybe by unregistering
>>> Operator States after they aren't used any more in the RequiresStableInput
>>> code).  It seems to me that this isn't a Flink issue, but rather a Beam
>>> issue.
>>>
>>> Thanks for pointing me in the right direction.
>>>
>>> On Thu, Apr 16, 2020 at 11:29 AM Yun Tang  wrote:
>>>
 Hi Stephen

 I think the state name [1] which would be changed every time might the
 root cause. I am not familiar with Beam code, would it be possible to
 create so many operator states? Did you configure some parameters wrongly?


 [1]
 https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L95

 Best
 Yun Tang
 --
 *From:* Stephen Patel 
 *Sent:* Thursday, April 16, 2020 22:30
 *To:* Yun Tang 
 *Cc:* user@flink.apache.org 
 *Subject:* Re: Streaming Job eventually begins failing during
 checkpointing

 Correction.  I've actually found a place where it potentially might be
 creating a new operator state per checkpoint:

 https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105
 https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149

 This gives me something I can investigate locally at least.

 On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel 
 wrote:

 I can't say that I ever call that directly.  The beam library that I'm
 using does call it in a couple places:
 https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

 But it seems to be the same descriptor every time.  Is that limit per
 operator?  That is, can each operator host up to 32767 operator/broadcast
 states?  I assume that's by name?

 On Wed, Apr 15, 2020 at 10:46 PM Yun Tang  wrote:

 Hi  Stephen

 This is not related with RocksDB but with default on-heap operator
 state backend. From your exception stack trace, you have created too many
 operator states (more than 32767).
 How do you call context.getOperatorStateStore().getListState or
 context.getOperatorStateStore().getBroadcastState ? Did you pass a
 different operator state descriptor each time?

 Best
 Yun Tang
 --
 *From:* Stephen Patel 
 *Sent:* Thursday, April 16, 2020 2:09
 *To:* user@flink.apache.org 
 *Subject:* Streaming Job eventually begins failing during checkpointing

 I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's
 configured to use rocksdb, and checkpoint once a minute to hdfs.  This job
 operates just fine for around 20 days, and then begins failing with this
 exception (it fails, restarts, and fails again, repeatedly):

 2020-04-15 13:15:02,920 INFO
  

Flink Lookup Filter Pushdown

2020-04-27 Thread forideal
Hello, my friend.

I have a dimension table.

createtabledim_u_score(u_idbigint,varchar,score_adouble,score_bdouble)with{xxx}Inascene

The condition of lookup is fliter score_a > 0.9

In another scenario

The condition of lookup is fliter score_b > 1

In Flink, at present, lookup join can use on to pass key values, such as

selectscore_a...leftjoin...source_table.u_id=dim_u_score.u_id

If so, what should I do?

If not, can I say that I can create multiple tables with conditions to use when 
it comes?

such as

createtabledim_u_score_filter_a(u_idbigint,varchar,score_adouble,score_bdouble)with{"filter_condition"="score_a
 > 0.9 
"}createtabledim_u_score_filter_b(u_idbigint,varchar,score_adouble,score_bdouble)with{"filter_condition"="fliter
 score_b > 1 "}

Then, in the process of lookup, push down to the specific execution engine to 
complete the lookup filter.

Cannot map nested Tuple fields to table columns

2020-04-27 Thread Gyula Fóra
Hi All!

I was trying to flatten a nested tuple into named columns with the
fromDataStream method and I hit some problems with mapping tuple fields to
column names.

It seems like the `f0 as ColumnName` kind of expressions are not parsed
correctly.

It is very easy to reproduce:
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), "f0 as name,
f1 as age");

This leads to the following 2 kinds of errors depending on how you write
it:
 - Alias 'name' is not allowed if other fields are referenced by position.
 - Could not parse expression at column 7: `(' expected but `'' found
f0 as 'name', f1 as 'age'

I could not find any test cases that would use this logic so I wonder if I
am doing something wrong here, the docs show that this should be possible:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#tuples-scala-and-java-and-case-classes-scala-only

I was actually trying to extract nested tuple fields this way but I did not
get that far. It also seems to fail for Row data types.

What am I doing wrong?

Gyula


Re: 任务假死

2020-04-27 Thread Weihua Hu
你配置的 jobmanager.execution.failover-strategy 是什么呢?如果是 region 的话,作业不会因为 Task 
失败状态变为异常。
可以在WEB ui 进入作业拓扑查看单个 task 的状态


Best
Weihua Hu

> 2020年4月26日 11:43,yanggang_it_job  写道:
> 
> 感谢您的回复,这个问题和您刚才给我的场景有些相似,但还是有些许差异。
> 刚才试了几种方式,图片好像都无法访问。
> 下面我详细介绍下异常情况
> 1、我的任务是从三个kafka读取,然后通过onGroup实现left 
> join语义,然后定义了一个滑动窗口(600,10),最后通过一个CoGroupFunction进行处理具体的数据
> 2、异常出现在其中一个CoGruopFunction(Window(TumblingEventTimeWindows(60), 
> EventTimeTrigger, CoGroupWindowFunction) (15/200))报OOM,异常栈如下
>  java.lang.OutOfMemoryError: unable to create newnative thread
>at java.lang.Thread.start0(NativeMethod)
>at java.lang.Thread.start(Thread.java:717)
>at 
> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
>at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1237)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
>at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:137)
>at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>at java.lang.Thread.run(Thread.java:748)
> 
> 3、除了这个算子vertice为FAILED,其他vertice都为CANCELED,JobManager状态为RUNNING
> 
> 
> 正常情况下出现这个错,JM会找一台合适的机器重新把TM启起来或者多次尝试后,任务退出。
> 但是现在任务的运行状态为RUNNING,虽然为RUNNING但是也不写入数据到下游存储。
> 
> 
> 
> 
> 
> 
> 
> thanks
> 
> 
> 在 2020-04-26 11:01:04,"Zhefu PENG"  写道:
>> 图好像挂了看不到。是不是和这两个场景描述比较相似
>> 
>> [1] http://apache-flink.147419.n8.nabble.com/flink-kafka-td2386.html
>> [2]  http://apache-flink.147419.n8.nabble.com/Flink-Kafka-td2390.html
>> On Sun, Apr 26, 2020 at 10:58 yanggang_it_job 
>> wrote:
>> 
>>> 1、Flink-UI截图
>>> 我任务的启动配置是 -p 200 -ys 5,也就是说会有40个TM运行,之前正常运行的时候确实是40个TM,而现在只有1个TM在运行;
>>> 同时通过观察数据的同步时间,任务已经在两天前停止写入数据了,但是查看任务的状态却是RUNNING;
>>> 我的理解是当tm申请失败,那么当前任务就会退出并把状态设置为FINISHED,但是真实情况却是上面描述那样。
>>> 请问为什么会出现这种情况呢?
>>> 
>>> thanks
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 



?????? flink 1.10????????

2020-04-27 Thread ??????(Jiacheng Jiang)
ThankyouXintong.??
-XX:MaxDirectMemorySizeFramework + Task Off-Heap + Network Memory
MaxDirectMemorySize=FrameworkOff-Heap+ Task Off-Heap + Network 
Memory??MaxDirectMemorySize=10??10=1+1+8??10=1+8+1,??2??3??task
 off-heapjob



----
??: "Xintong Song"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_detail.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_trouble.html#outofmemoryerror-direct-buffer-memory
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_setup.html#%E6%89%98%E7%AE%A1%E5%86%85%E5%AD%98

OnMon,Apr27,2020at10:31AM??(JiachengJiang)<920334...@qq.com
wrote:

hi


nbsp;nbsp;??standalone
flinkjob??tm??8gflink1.10??taskmanager.memory.flink.size??10g??job??jobjava.lang.OutOfMemoryError:
Directbuffermemory??1??job??Direct
Memory??flink1.10standaloneclusterslot
??slotATaskManagerwiththreeslots,forexample,will
dedicate1/3ofitsmanagedmemorytoeachslot.nbsp;??itsmanaged
memory??taskmanager.memory.managed.size??


bestnbsp;
Jungle

Re: 晚于watermark的数据何时被抛弃

2020-04-27 Thread Benchao Li
嗯,如果是普通的group by,的确是做不到的。

lec ssmi  于2020年4月27日周一 下午5:59写道:

> 谢谢回答。
> 但这样存在一个问题,加入我不使用window,用普通的group by hour
> 来实现聚合,hour为string类型。我也需要丢弃掉晚于watermark的数据,
> sql中,在 TableAggregateFunction   里面是无法操作的。
> DataStream 有ProcessFunction,当然是可以实现的。
>
> Benchao Li  于2020年4月27日周一 下午5:47写道:
>
> > Hi,
> >
> > 你的理解是对的,只有涉及到时间的一些算子才会有可能丢弃迟到的数据,比如典型的就是Window和CEP。
> > 像普通的算子Map、Filter这种,不涉及到时间的概念,不会丢弃数据的。
> >
> > lec ssmi  于2020年4月27日周一 下午5:38写道:
> >
> > > Hi:
> > >   如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join
> > > 也会自动过滤掉而不处理?
> > >  感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。
> > >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


"Fill in" notification messages based on event time watermark

2020-04-27 Thread Manas Kale
Hi,
I have an upstream operator that outputs device state transition messages
with event timestamps. Meaning it only emits output when a transition takes
place.
For example,
state1 @ 1 PM
state2 @ 2 PM
and so on.

*Using a downstream operator, I want to emit notification messages as per
some configured periodicity.* For example, if periodicity = 20 min, in the
above scenario this operator will output :
state1 notification @ 1PM
state1 notification @ 1.20PM
state1 notification @ 1.40PM
 ...

*Now the main issue is that I want this to be driven by the watermark and
not by transition events received from upstream. *Meaning I would like to
see notification events as soon as the watermark crosses their timestamps;
*not* when the next transition event arrives at the operator (which could
be hours later, as above).

My first solution, using a keyedProcessFunction and timers did not work as
expected because the order in which transition events arrived at this
operator was non-deterministic. To elaborate, assume a
setAutoWatermarkInterval of 10 second.
If we get transition events :
state1 @ 1sec
state2 @ 3 sec
state3 @ 5 sec
state1 @ 8 sec
the order in which these events arrived at my keyedProcessFunction was not
fixed. To solve this, these messages need to be sorted on event time, which
led me to my second solution.

My second solution, using a EventTimeTumblingWindow with size =
setAutoWatermarkInterval, also does not work. I sorted accumulated events
in the window and applied notification-generation logic on them in order.
However, I assumed that windows are created even if there are no elements.
Since this is not the case, this solution generates notifications only when
the next state tranisition message arrives, which could be hours later.

Does anyone have any suggestions on how I can implement this?
Thanks!


Re: 晚于watermark的数据何时被抛弃

2020-04-27 Thread lec ssmi
谢谢回答。
但这样存在一个问题,加入我不使用window,用普通的group by hour
来实现聚合,hour为string类型。我也需要丢弃掉晚于watermark的数据,
sql中,在 TableAggregateFunction   里面是无法操作的。
DataStream 有ProcessFunction,当然是可以实现的。

Benchao Li  于2020年4月27日周一 下午5:47写道:

> Hi,
>
> 你的理解是对的,只有涉及到时间的一些算子才会有可能丢弃迟到的数据,比如典型的就是Window和CEP。
> 像普通的算子Map、Filter这种,不涉及到时间的概念,不会丢弃数据的。
>
> lec ssmi  于2020年4月27日周一 下午5:38写道:
>
> > Hi:
> >   如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join
> > 也会自动过滤掉而不处理?
> >  感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: History Server Not Showing Any Jobs - File Not Found?

2020-04-27 Thread Chesnay Schepler
overview.json is a generated file that is placed in the local directory 
controlled by /historyserver.web.tmpdir/.


Have you configured this option to point to some non-local filesystem? 
(Or if not, is the java.io.tmpdir property pointing somewhere funny?)


On 24/04/2020 18:24, Hailu, Andreas wrote:


I’m having a further look at the code in 
HistoryServerStaticFileServerHandler - is there an assumption about 
where overview.json is supposed to be located?


*// *ah**

*From:*Hailu, Andreas [Engineering]
*Sent:* Wednesday, April 22, 2020 1:32 PM
*To:* 'Chesnay Schepler' ; Hailu, Andreas 
[Engineering] ; user@flink.apache.org

*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Hi Chesnay, thanks for responding. We’re using Flink 1.9.1. I enabled 
DEBUG level logging and this is something relevant I see:


2020-04-22 13:25:52,566 [Flink-HistoryServer-ArchiveFetcher-thread-1] 
DEBUG DFSInputStream - Connecting to datanode 10.79.252.101:1019


2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] 
DEBUG SaslDataTransferClient - SASL encryption trust check: 
localHostTrusted = false, remoteHostTrusted = false


2020-04-22 13:25:52,567 [Flink-HistoryServer-ArchiveFetcher-thread-1] 
DEBUG SaslDataTransferClient - SASL client skipping handshake in 
secured configuration with privileged port for addr = /10.79.252.101, 
datanodeId = DatanodeI


nfoWithStorage[10.79.252.101:1019,DS-7f4ec55d-7c5f-4a0e-b817-d9e635480b21,DISK]

*2020-04-22 13:25:52,571 [Flink-HistoryServer-ArchiveFetcher-thread-1] 
DEBUG DFSInputStream - DFSInputStream has been closed already*


*2020-04-22 13:25:52,573 [nioEventLoopGroup-3-6] DEBUG 
HistoryServerStaticFileServerHandler - Unable to load requested file 
/jobs/overview.json from classloader*


2020-04-22 13:25:52,576 [IPC Parameter Sending Thread #0] DEBUG 
Client$Connection$3 - IPC Client (1578587450) connection to 
d279536-002.dc.gs.com/10.59.61.87:8020 from d...@gs.com 
 sending #1391


Aside from that, it looks like a lot of logging around datanodes and 
block location metadata. Did I miss something in my classpath, 
perhaps? If so, do you have a suggestion on what I could try?


*// *ah**

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Wednesday, April 22, 2020 2:16 AM
*To:* Hailu, Andreas [Engineering] >; user@flink.apache.org 


*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

Which Flink version are you using?

Have you checked the history server logs after enabling debug logging?

On 21/04/2020 17:16, Hailu, Andreas [Engineering] wrote:

Hi,

I’m trying to set up the History Server, but none of my
applications are showing up in the Web UI. Looking at the console,
I see that all of the calls to /overview return the following 404
response: {"errors":["File not found."]}.

I’ve set up my configuration as follows:

JobManager Archive directory:

*jobmanager.archive.fs.dir*:
hdfs:///user/p2epda/lake/delp_qa/flink_hs/

-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs

Found 44282 items

-rw-r- 3 delp datalake_admin_dev  50569 2020-03-21 23:17
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936

-rw-r- 3 delp datalake_admin_dev  49578 2020-03-03 08:45
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5

-rw-r- 3 delp datalake_admin_dev  50842 2020-03-24 15:19
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757

...

...

History Server will fetch the archived jobs from the same location:

*historyserver.archive.fs.dir*:
hdfs:///user/p2epda/lake/delp_qa/flink_hs/

So I’m able to confirm that there are indeed archived applications
that I should be able to view in the histserver. I’m not able to
find out what file the overview service is looking for from the
repository – any suggestions as to what I could look into next?

Best,

Andreas




Your Personal Data: We may collect and process information about
you that may be subject to data protection laws. For more
information about how we use and disclose your personal data, how
we protect your information, our legal basis to use your
information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices 




Your Personal Data: We may collect and process information about you 
that may be subject to data protection laws. For more information 
about how we use and disclose your personal data, how we protect your 
information, our legal basis to use your information, your rights and 
who you can contact, please refer to: www.gs.com/privacy-notices 

Re: 晚于watermark的数据何时被抛弃

2020-04-27 Thread Benchao Li
Hi,

你的理解是对的,只有涉及到时间的一些算子才会有可能丢弃迟到的数据,比如典型的就是Window和CEP。
像普通的算子Map、Filter这种,不涉及到时间的概念,不会丢弃数据的。

lec ssmi  于2020年4月27日周一 下午5:38写道:

> Hi:
>   如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join
> 也会自动过滤掉而不处理?
>  感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


晚于watermark的数据何时被抛弃

2020-04-27 Thread lec ssmi
Hi:
  如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join
也会自动过滤掉而不处理?
 感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。


Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread Benchao Li
Hi,

There is indeed a state for the aggregation result, however we cannot
disable it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state
will be kept.
If you can ensure the time gap between two records of the same id larger
than, for example
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the
count directly instead
of return the details.

izual  于2020年4月27日周一 下午5:06写道:

> I implements my DimTable by extends `LookupTableSource`[1], which stores
> data like:
>
> id=1 -> (SH, BJ, SD)
>
> id=2 -> (...)
>
> and then extends `TableFunction` to return the value corresponding to the
> lookup keys,and maybe return multi rows, for example, when lookupkeys is
> id=1, then in the `TableFunction.eval`
>
> ```
>
> collect('SH')
>
> collect('BJ')
>
> collect('SD')
>
> ```
>
>
> Now I want to get the region'count by id, which is from the tblEvent.id,
> sql is :
>
>
> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
> tblEvent.id
>
>
> I expect the result of COUNT is always 3 for id = 1, no matter the id=1
> appears how many times.
>
> but the actual result is : 3, 6, 9, ...
>
>
> I think this is bcz the state mechanism behind COUNT, how to turn this off?
>
> Or what's the correct use for this?
> StreamQueryConfig.maxIdleStateRetentionTime or something?
>
>
> The reason not using state in flink:
> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
I implements my DimTable by extends `LookupTableSource`[1], which stores data 
like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the 
lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, 
then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```




Now I want to get the region'count by id, which is from the tblEvent.id, sql is 
:




SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM 
AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id




I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears 
how many times.

but the actual result is : 3, 6, 9, ...




I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime 
or something?




The reason not using state in flink: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups

Re: Support LRU cache in JDBCLookupFunction

2020-04-27 Thread tao siyuan
好的,谢谢

Benchao Li  于2020年4月27日周一 下午5:00写道:

> 我觉得是可以的。
>
> tao siyuan  于2020年4月27日周一 下午4:24写道:
>
> > 谢谢,
> >
> > 我能否为第二个意见提交一个issue,为connector增加一个JDBCLookupFunction的异步接口
> >
> > Benchao Li  于2020年4月27日周一 下午4:11写道:
> >
> > > Hi,
> > >
> > > 第一个意见现在已经有了一个issue[1]和pr,可以参考一下。
> > > 第二个意见据我所知是有异步维表的接口和实现,但是connector还没有实现。
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-16038
> > >
> > > tao siyuan  于2020年4月27日周一 下午4:00写道:
> > >
> > > > hi,
> > > >
> > > > 不好意思,我忽略了内部使用的Guava cache,
> > > >
> > > > 我这里可以提2个意见吗:
> > > > 1,增加统计缓存命中情况
> > > > 2,增加异步交互模式
> > > >
> > > > Jark Wu  于2020年4月27日周一 下午3:31写道:
> > > >
> > > > > Hi,
> > > > >
> > > > > 目前 jdbc lookup 就是用的 LRU cache。 你是希望 cache 大小能动态调整?
> > > > >
> > > > > > 2020年4月27日 15:24,tao siyuan  写道:
> > > > > >
> > > > > > HI all:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 目前,一些情况下会遇到到流及外部维表Join。而使用JDBCLookupFunction只支持cache固定大小和条数,但是通常,我们可以使用cache
> > > > > > LRU 策略 提高cache使用率以及reduce数据库的交互次数。
> > > > > >
> > > > > > 请问这是一个值得提交的issue吗?
> > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: Fault tolerance in Flink file Sink

2020-04-27 Thread Kostas Kloudas
Hi Eyal and Dawid,

@Eyal I think Dawid explained pretty well what is happening and why in
distributed settings, the underlying FS on which the StreamingFileSink
writes has to be durable and accessible to all parallel instances of
the job. Please let us know if you have any further questions.

Cheers,
Kostas

On Mon, Apr 27, 2020 at 9:52 AM Eyal Pe'er  wrote:
>
> Hi Dawid,
> Thanks for the very detailed answer and the correct assumptions (I am using 
> row format).
>
> I tried not using NFS/S3, but seems like it is the only option I have.
>
> Best regards
>
> Eyal Peer
>
> From: Dawid Wysakowicz 
> Sent: Friday, April 24, 2020 4:20 PM
> To: Eyal Pe'er ; user 
> Subject: Re: Fault tolerance in Flink file Sink
>
>
>
> Hi Eyal,
>
> First of all I would say a local filesystem is not a right choice for what 
> you are trying to achieve. I don't think you can achive a true exactly once 
> policy in this setup. Let me elaborate why.
>
> Let me clarify a bit how the StreamingFileSink works.  The interesting bit is 
> how it behaves on checkpoints. The behavior is controlled by a RollingPolicy. 
> As you have not said what format you use lets assume you use row format 
> first. For a row format the default rolling policy (when to change the file 
> from in-progress to pending) is it will be rolled if the file reaches 128MB, 
> the file is older than 60 sec or it has not been written to for 60 sec. It 
> does not roll on a checkpoint. Moreover StreamingFileSink considers the 
> filesystem as a durable sink that can be accessed after a restore. That 
> implies that it will try to append to this file when restoring from 
> checkpoint/savepoint.
>
> Even if you rolled the files on every checkpoint you still might face the 
> problem that you can have some leftovers because the StreamingFileSink moves 
> the files from pending to complete after the checkpoint is completed. If a 
> failure happens between finishing the checkpoint and moving the files it will 
> not be able to move them after a restore (it would do it if had an access).
>
> Lastly a completed checkpoint will contain offsets of records that were 
> processed successfully end-to-end, that means records that are assumed 
> committed by the StreamingFileSink. This can be records written to an 
> in-progress file with a pointer in a StreamingFileSink checkpointed metadata, 
> records in a "pending" file with an entry in a StreamingFileSink checkpointed 
> metadata that this file has been completed or records in "finished" files.[1]
>
> Therefore as you can see there are multiple scenarios when the 
> StreamingFileSink has to access the files after a restart.
>
> Last last thing, you mentioned "committing to the "bootstrap-server". Bear in 
> mind that Flink does not use offsets committed back to Kafka for guaranteeing 
> consistency. It can write those offsets back but only for 
> monitoring/debugging purposes. Flink stores/restores the processed offsets 
> from its checkpoints.[3]
>
> Let me know if it helped. I tried my best ;) BTW I highly encourage reading 
> the linked sources as they try to describe all that in a more structured way.
>
> I am also cc'ing Kostas who knows more about the StreamingFileSink than I 
> do., so he can maybe correct me somewhere.
>
>  Best,
>
> Dawid
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
>
> [3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>
> On 23/04/2020 12:11, Eyal Pe'er wrote:
>
> Hi all,
> I am using Flink streaming with Kafka consumer connector (FlinkKafkaConsumer) 
> and file Sink (StreamingFileSink) in a cluster mode with exactly once policy.
>
> The file sink writes the files to the local disk.
>
> I’ve noticed that if a job fails and automatic restart is on, the task 
> managers look for the leftovers files from the last failing job (hidden 
> files).
>
> Obviously, since the tasks can be assigned to different task managers, this 
> sums up to more failures over and over again.
>
> The only solution I found so far is to delete the hidden files and resubmit 
> the job.
>
> If I get it right (and please correct me If I wrong), the events in the 
> hidden files were not committed to the bootstrap-server, so there is no data 
> loss.
>
>
>
> Is there a way, forcing Flink to ignore the files that were written already? 
> Or maybe there is a better way to implement the solution (maybe somehow with 
> savepoints)?
>
>
>
> Best regards
>
> Eyal Peer
>
>


Re: flink 批方式如何读取多路径文件或通配符文件

2020-04-27 Thread Jingsong Li
"all FileInputFormats have to support multiple paths"
如果你有自己的实现,overwrite supportsMultiPaths它为true,几乎所有的实现有是true的。

如果你使用DataStream,需注意了,不能使用StreamExecutionEnvironment.createInput(不支持多路径),需显示使用addSource(new
InputFormatSourceFunction)

Best,
Jingsong Lee

On Mon, Apr 27, 2020 at 3:43 PM 无痕 <95509...@qq.com> wrote:

> 感谢回复!
> 应用使用Dataset,查了下FileInputFormat是抽象类,我看里面supportsMultiPaths方法被Deprecated
> /**
>  * Override this method to supports multiple paths.
>  * When this method will be removed, all FileInputFormats have to support
> multiple paths.
>  *
>  * @return True if the FileInputFormat supports multiple paths, false
> otherwise.
>  *
>  * @deprecated Will be removed for Flink 2.0.
>  */
> @Deprecated
> public boolean supportsMultiPaths() {
>return false;
> }
>
>
>
>
> --原始邮件--
> 发件人:"Jingsong Li" 发送时间:2020年4月27日(星期一) 上午9:29
> 收件人:"user-zh"
> 主题:Re: flink 批方式如何读取多路径文件或通配符文件
>
>
>
> Hi,
>
> 你是在用Dataset还是SQL?
>
> 如果是Dataset或是Datastream
> 先把文件筛选出来,然后FileInputFormat.setFilePaths?
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 10:01 PM 无痕 <95509...@qq.com wrote:
>
>  HI ALL :
>  nbsp; nbsp; nbsp;请问下,flink批方式如何读取多路径文件或通配符文件?如下:
>  nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp;/abc/202004*/t1.datanbsp;
>  读2020年4月所有t1.data文件;
>  nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp;/abc/20200401/t*.data
>  读2020年4月1日目录下所有t开头的文件
>  nbsp; nbsp; nbsp;谢谢!
>
>
>
> --
> Best, Jingsong Lee



-- 
Best, Jingsong Lee


Re: Support LRU cache in JDBCLookupFunction

2020-04-27 Thread tao siyuan
谢谢,

我能否为第二个意见提交一个issue,为connector增加一个JDBCLookupFunction的异步接口

Benchao Li  于2020年4月27日周一 下午4:11写道:

> Hi,
>
> 第一个意见现在已经有了一个issue[1]和pr,可以参考一下。
> 第二个意见据我所知是有异步维表的接口和实现,但是connector还没有实现。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16038
>
> tao siyuan  于2020年4月27日周一 下午4:00写道:
>
> > hi,
> >
> > 不好意思,我忽略了内部使用的Guava cache,
> >
> > 我这里可以提2个意见吗:
> > 1,增加统计缓存命中情况
> > 2,增加异步交互模式
> >
> > Jark Wu  于2020年4月27日周一 下午3:31写道:
> >
> > > Hi,
> > >
> > > 目前 jdbc lookup 就是用的 LRU cache。 你是希望 cache 大小能动态调整?
> > >
> > > > 2020年4月27日 15:24,tao siyuan  写道:
> > > >
> > > > HI all:
> > > >
> > > >
> > >
> >
> 目前,一些情况下会遇到到流及外部维表Join。而使用JDBCLookupFunction只支持cache固定大小和条数,但是通常,我们可以使用cache
> > > > LRU 策略 提高cache使用率以及reduce数据库的交互次数。
> > > >
> > > > 请问这是一个值得提交的issue吗?
> > >
> > >
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: Support LRU cache in JDBCLookupFunction

2020-04-27 Thread Benchao Li
Hi,

第一个意见现在已经有了一个issue[1]和pr,可以参考一下。
第二个意见据我所知是有异步维表的接口和实现,但是connector还没有实现。

[1] https://issues.apache.org/jira/browse/FLINK-16038

tao siyuan  于2020年4月27日周一 下午4:00写道:

> hi,
>
> 不好意思,我忽略了内部使用的Guava cache,
>
> 我这里可以提2个意见吗:
> 1,增加统计缓存命中情况
> 2,增加异步交互模式
>
> Jark Wu  于2020年4月27日周一 下午3:31写道:
>
> > Hi,
> >
> > 目前 jdbc lookup 就是用的 LRU cache。 你是希望 cache 大小能动态调整?
> >
> > > 2020年4月27日 15:24,tao siyuan  写道:
> > >
> > > HI all:
> > >
> > >
> >
> 目前,一些情况下会遇到到流及外部维表Join。而使用JDBCLookupFunction只支持cache固定大小和条数,但是通常,我们可以使用cache
> > > LRU 策略 提高cache使用率以及reduce数据库的交互次数。
> > >
> > > 请问这是一个值得提交的issue吗?
> >
> >
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Support LRU cache in JDBCLookupFunction

2020-04-27 Thread tao siyuan
hi,

不好意思,我忽略了内部使用的Guava cache,

我这里可以提2个意见吗:
1,增加统计缓存命中情况
2,增加异步交互模式

Jark Wu  于2020年4月27日周一 下午3:31写道:

> Hi,
>
> 目前 jdbc lookup 就是用的 LRU cache。 你是希望 cache 大小能动态调整?
>
> > 2020年4月27日 15:24,tao siyuan  写道:
> >
> > HI all:
> >
> >
> 目前,一些情况下会遇到到流及外部维表Join。而使用JDBCLookupFunction只支持cache固定大小和条数,但是通常,我们可以使用cache
> > LRU 策略 提高cache使用率以及reduce数据库的交互次数。
> >
> > 请问这是一个值得提交的issue吗?
>
>


Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Zhijiang
Thanks Dian for the release work and thanks everyone involved. 

Best,
Zhijiang


--
From:Till Rohrmann 
Send Time:2020 Apr. 27 (Mon.) 15:13
To:Jingsong Li 
Cc:dev ; Leonard Xu ; Benchao Li 
; Konstantin Knauf ; jincheng 
sun ; Hequn Cheng ; Dian Fu 
; user ; user-zh 
; Apache Announce List 
Subject:Re: [ANNOUNCE] Apache Flink 1.9.3 released

Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>>  @Dian, thanks a lot for the release and for being the release
>> manager.
>>  Also thanks to everyone who made this release possible!
>> 
>>  Best,
>>  Hequn
>> 
>>  On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> 
>> > Hi everyone,
>> >
>> > The Apache Flink community is very happy to announce the release of
>> > Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> > 1.9 series.
>> >
>> > Apache Flink(r) is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> > improvements for this bugfix release:
>> > https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >
>> > The full release notes are available in Jira:
>> > https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >
>> > We would like to thank all contributors of the Apache Flink
>> community
>> > who made this release possible!
>> > Also great thanks to @Jincheng for helping finalize this release.
>> >
>> > Regards,
>> > Dian
>> >
>> 
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica 
>> >>
>> >> --
>> >> Join Flink Forward  - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>



Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Zhijiang
Thanks Dian for the release work and thanks everyone involved. 

Best,
Zhijiang


--
From:Till Rohrmann 
Send Time:2020 Apr. 27 (Mon.) 15:13
To:Jingsong Li 
Cc:dev ; Leonard Xu ; Benchao Li 
; Konstantin Knauf ; jincheng 
sun ; Hequn Cheng ; Dian Fu 
; user ; user-zh 
; Apache Announce List 
Subject:Re: [ANNOUNCE] Apache Flink 1.9.3 released

Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>>  @Dian, thanks a lot for the release and for being the release
>> manager.
>>  Also thanks to everyone who made this release possible!
>> 
>>  Best,
>>  Hequn
>> 
>>  On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> 
>> > Hi everyone,
>> >
>> > The Apache Flink community is very happy to announce the release of
>> > Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> > 1.9 series.
>> >
>> > Apache Flink(r) is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> > improvements for this bugfix release:
>> > https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >
>> > The full release notes are available in Jira:
>> > https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >
>> > We would like to thank all contributors of the Apache Flink
>> community
>> > who made this release possible!
>> > Also great thanks to @Jincheng for helping finalize this release.
>> >
>> > Regards,
>> > Dian
>> >
>> 
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica 
>> >>
>> >> --
>> >> Join Flink Forward  - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>



RE: Fault tolerance in Flink file Sink

2020-04-27 Thread Eyal Pe'er
Hi Dawid,
Thanks for the very detailed answer and the correct assumptions (I am using row 
format).
I tried not using NFS/S3, but seems like it is the only option I have.
Best regards
Eyal Peer
From: Dawid Wysakowicz 
Sent: Friday, April 24, 2020 4:20 PM
To: Eyal Pe'er ; user 
Subject: Re: Fault tolerance in Flink file Sink


Hi Eyal,

First of all I would say a local filesystem is not a right choice for what you 
are trying to achieve. I don't think you can achive a true exactly once policy 
in this setup. Let me elaborate why.

Let me clarify a bit how the StreamingFileSink works.  The interesting bit is 
how it behaves on checkpoints. The behavior is controlled by a RollingPolicy. 
As you have not said what format you use lets assume you use row format first. 
For a row format the default rolling policy (when to change the file from 
in-progress to pending) is it will be rolled if the file reaches 128MB, the 
file is older than 60 sec or it has not been written to for 60 sec. It does not 
roll on a checkpoint. Moreover StreamingFileSink considers the filesystem as a 
durable sink that can be accessed after a restore. That implies that it will 
try to append to this file when restoring from checkpoint/savepoint.

Even if you rolled the files on every checkpoint you still might face the 
problem that you can have some leftovers because the StreamingFileSink moves 
the files from pending to complete after the checkpoint is completed. If a 
failure happens between finishing the checkpoint and moving the files it will 
not be able to move them after a restore (it would do it if had an access).

Lastly a completed checkpoint will contain offsets of records that were 
processed successfully end-to-end, that means records that are assumed 
committed by the StreamingFileSink. This can be records written to an 
in-progress file with a pointer in a StreamingFileSink checkpointed metadata, 
records in a "pending" file with an entry in a StreamingFileSink checkpointed 
metadata that this file has been completed or records in "finished" files.[1]

Therefore as you can see there are multiple scenarios when the 
StreamingFileSink has to access the files after a restart.

Last last thing, you mentioned "committing to the "bootstrap-server". Bear in 
mind that Flink does not use offsets committed back to Kafka for guaranteeing 
consistency. It can write those offsets back but only for monitoring/debugging 
purposes. Flink stores/restores the processed offsets from its checkpoints.[3]

Let me know if it helped. I tried my best ;) BTW I highly encourage reading the 
linked sources as they try to describe all that in a more structured way.

I am also cc'ing Kostas who knows more about the StreamingFileSink than I do., 
so he can maybe correct me somewhere.

 Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

[3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
On 23/04/2020 12:11, Eyal Pe'er wrote:
Hi all,
I am using Flink streaming with Kafka consumer connector (FlinkKafkaConsumer) 
and file Sink (StreamingFileSink) in a cluster mode with exactly once policy.
The file sink writes the files to the local disk.
I've noticed that if a job fails and automatic restart is on, the task managers 
look for the leftovers files from the last failing job (hidden files).
Obviously, since the tasks can be assigned to different task managers, this 
sums up to more failures over and over again.
The only solution I found so far is to delete the hidden files and resubmit the 
job.
If I get it right (and please correct me If I wrong), the events in the hidden 
files were not committed to the bootstrap-server, so there is no data loss.

Is there a way, forcing Flink to ignore the files that were written already? Or 
maybe there is a better way to implement the solution (maybe somehow with 
savepoints)?

Best regards
Eyal Peer



?????? flink ????????????????????????????????????

2020-04-27 Thread ????
??
DatasetFileInputFormat??supportsMultiPaths??Deprecated
/**
 * Override this method to supports multiple paths.
 * When this method will be removed, all FileInputFormats have to support 
multiple paths.
 *
 * @return True if the FileInputFormat supports multiple paths, false otherwise.
 *
 * @deprecated Will be removed for Flink 2.0.
 */
@Deprecated
public boolean supportsMultiPaths() {
   return false;
}




----
??:"Jingsong Li"

Re: Support LRU cache in JDBCLookupFunction

2020-04-27 Thread Jark Wu
Hi,

目前 jdbc lookup 就是用的 LRU cache。 你是希望 cache 大小能动态调整? 

> 2020年4月27日 15:24,tao siyuan  写道:
> 
> HI all:
> 
> 目前,一些情况下会遇到到流及外部维表Join。而使用JDBCLookupFunction只支持cache固定大小和条数,但是通常,我们可以使用cache
> LRU 策略 提高cache使用率以及reduce数据库的交互次数。
> 
> 请问这是一个值得提交的issue吗?



Re: Task Assignment

2020-04-27 Thread Marta Paes Moreira
Sorry — I didn't understand you were dealing with multiple keys.

In that case, I'd recommend you read about key-group assignment [1] and
check the KeyGroupRangeAssignment class [2].

Key-groups are assigned to parallel tasks as ranges before the job is
started — this is also a well-defined behaviour in Flink, with implications
in state reassignment on rescaling. I'm afraid that if you try to hardwire
this behaviour into your code, the job might not be transparently
rescalable anymore.

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

[2]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java


On Fri, Apr 24, 2020 at 7:10 AM Navneeth Krishnan 
wrote:

> Hi Marta,
>
> Thanks for you response. What I'm looking for is something like data
> localization. If I have one TM which is processing a set of keys, I want to
> ensure all keys of the same type goes to the same TM rather than using
> hashing to find the downstream slot. I could use a common key to do this
> but I would have to parallelize as much as possible since the number of
> incoming messages is too large to narrow down to a single key and
> processing it.
>
> Thanks
>
> On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira 
> wrote:
>
>> Hi, Navneeth.
>>
>> If you *key* your stream using stream.keyBy(…), this will logically
>> split your input and all the records with the same key will be processed in
>> the same operator instance. This is the default behavior in Flink for keyed
>> streams and transparently handled.
>>
>> You can read more about it in the documentation [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state
>>
>> On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Is there a way for an upstream operator to know how the downstream
>>> operator tasks are assigned? Basically I want to group my messages to be
>>> processed on slots in the same node based on some key.
>>>
>>> Thanks
>>>
>>


Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Till Rohrmann
Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>>  @Dian, thanks a lot for the release and for being the release
>> manager.
>>  Also thanks to everyone who made this release possible!
>> 
>>  Best,
>>  Hequn
>> 
>>  On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> 
>> > Hi everyone,
>> >
>> > The Apache Flink community is very happy to announce the release of
>> > Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> > 1.9 series.
>> >
>> > Apache Flink® is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> > improvements for this bugfix release:
>> > https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >
>> > The full release notes are available in Jira:
>> > https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >
>> > We would like to thank all contributors of the Apache Flink
>> community
>> > who made this release possible!
>> > Also great thanks to @Jincheng for helping finalize this release.
>> >
>> > Regards,
>> > Dian
>> >
>> 
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica 
>> >>
>> >> --
>> >> Join Flink Forward  - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>


Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Till Rohrmann
Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>>  @Dian, thanks a lot for the release and for being the release
>> manager.
>>  Also thanks to everyone who made this release possible!
>> 
>>  Best,
>>  Hequn
>> 
>>  On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> 
>> > Hi everyone,
>> >
>> > The Apache Flink community is very happy to announce the release of
>> > Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> > 1.9 series.
>> >
>> > Apache Flink® is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> > improvements for this bugfix release:
>> > https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >
>> > The full release notes are available in Jira:
>> > https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >
>> > We would like to thank all contributors of the Apache Flink
>> community
>> > who made this release possible!
>> > Also great thanks to @Jincheng for helping finalize this release.
>> >
>> > Regards,
>> > Dian
>> >
>> 
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica 
>> >>
>> >> --
>> >> Join Flink Forward  - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>