Re: presto s3p checkpoints and local stack

2021-01-28 Thread Arvid Heise
Hi Marco,

ideally you solve everything with IAM roles, but you can also use
credentials providers such as EnvironmentVariableCredentialsProvider[1].

The key should be
s3.aws.credentials.provider:
com.amazonaws.auth.EnvironmentVariableCredentialsProvider

Remember to put the respective jar into the folder of your s3p plugin, the
folder structure should look like described here[2].

Note that this is tested for s3a, so it could be that it works differently
in s3p. I see that presto usually uses presto.s3.credentials-provider.

[1]
https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/EnvironmentVariableCredentialsProvider.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/plugins.html

On Thu, Jan 28, 2021 at 7:00 PM Marco Villalobos 
wrote:

> Is it possible to use an environmental credentials provider?
>
> On Thu, Jan 28, 2021 at 8:35 AM Arvid Heise  wrote:
>
>> Hi Marco,
>>
>> afaik you don't need HADOOP_HOME or core-site.xml.
>>
>> I'm also not sure from where you got your config keys. (I guess from the
>> Presto page, which probably all work if you remove hive., maybe we should
>> also support that)
>>
>> All keys with prefix s3 or s3p (and fs.s3, fs.s3p) are routed towards
>> presto [1].
>>
>> So it should be
>> s3.access-key: XXX
>> s3.secret-key: XXX
>> s3.endpoint: http://aws:4566
>> s3.path-style-access: true
>> s3.path.style.access: true (only one of them is needed, but I don't know
>> which, so please try out)
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#configure-access-credentials
>>
>> On Thu, Jan 28, 2021 at 4:58 PM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>>
>>> Hi,
>>>
>>> I got s3a working on localstack. The missing piece of information from
>>> Flink documentation seems to be that the system requires a HADOOP_HOME and
>>> core-site.xml.
>>>
>>> Flink documentation states that s3p (presto) should be used for file
>>> checkpointing into s3. I am using RocksDB, which I assume also means that I
>>> should use s3p (the documentation was not specific about that).  Is that
>>> assumption correct?
>>>
>>> However, I cannot get s3p working now.
>>>
>>> I did the following so far:
>>>
>>> I created the s3-fs-presto plugin directory and copied the jar from the
>>> opt directory there.
>>> I am not sure where to put the configuration keys though.  The
>>> documentation states that I can just put in my flink-conf.yaml, but I had
>>> no success.
>>>
>>> Where do I put the presto configuration keys? Are there any other
>>> missing steps? Is this something that would only work on an EMR environment
>>> with a real HIVE running?
>>>
>>> # The S3 storage endpoint server. This can be used to connect to an
>>> S3-compatible storage
>>> # system instead of AWS. When using v4 signatures, it is recommended to
>>> set this to the
>>> # AWS region-specific endpoint (e.g., http[s]://.s3-.
>>> amazonaws.com).
>>> hive.s3.endpoint: http://aws:4566
>>>
>>> # Use HTTPS to communicate with the S3 API (defaults to true).
>>> hive.s3.ssl.enabled: false
>>>
>>> # Use path-style access for all requests to the S3-compatible storage.
>>> # This is for S3-compatible storage that doesn’t support
>>> virtual-hosted-style access. (defaults to false)
>>> hive.s3.path-style-access: true
>>>
>>> But that also did not work.
>>>
>>> Any advice would be appreciated.
>>>
>>> -Marco Villalobos
>>>
>>


Re: Flink SQL and checkpoints and savepoints

2021-01-28 Thread Dan Hill
I went through a few of the recent Flink Forward videos and didn't see
solutions to this problem.  It sounds like some companies have solutions
but they didn't talk about them in enough detail to do something similar.

On Thu, Jan 28, 2021 at 11:45 PM Dan Hill  wrote:

> Is this savepoint recovery issue also true with the Flink Table API?  I'd
> assume so.  Just doublechecking.
>
> On Mon, Jan 18, 2021 at 1:58 AM Timo Walther  wrote:
>
>> I would check the past Flink Forward conference talks and blog posts. A
>> couple of companies have developed connectors or modified existing
>> connectors to make this work. Usually, based on event timestamps or some
>> external control stream (DataStream API around the actual SQL pipeline
>> for handling this).
>>
>> Also there is FLIP-150 which goes into this direction.
>>
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
>>
>> Regards,
>> Timo
>>
>>
>> On 18.01.21 10:40, Dan Hill wrote:
>> > Thanks Timo!
>> >
>> > The reason makes sense.
>> >
>> > Do any of the techniques make it easy to support exactly once?
>> >
>> > I'm inferring what is meant by dry out.  Are there any documented
>> > patterns for it?  E.g. sending data to new kafka topics between
>> releases?
>> >
>> >
>> >
>> >
>> > On Mon, Jan 18, 2021, 01:04 Timo Walther > > > wrote:
>> >
>> > Hi Dan,
>> >
>> > currently, we cannot provide any savepoint guarantees between
>> releases.
>> > Because of the nature of SQL that abstracts away runtime operators,
>> it
>> > might be that a future execution plan will look completely different
>> > and
>> > thus we cannot map state anymore. This is not avoidable because the
>> > optimizer might get smarter when adding new optimizer rules.
>> >
>> > For such cases, we recommend to dry out the old pipeline and/or
>> warm up
>> > a new pipeline with historic data when upgrading Flink. A change in
>> > columns sometimes works but even this depends on the used operators.
>> >
>> > Regards,
>> > Timo
>> >
>> >
>> > On 18.01.21 04:46, Dan Hill wrote:
>> >  > How well does Flink SQL work with checkpoints and savepoints?  I
>> > tried
>> >  > to find documentation for it in v1.11 but couldn't find it.
>> >  >
>> >  > E.g. what happens if the Flink SQL is modified between releases?
>> > New
>> >  > columns?  Change columns?  Adding joins?
>> >  >
>> >  >
>> >
>>
>>


Re: [ANNOUNCE] Apache Flink 1.10.3 released

2021-01-28 Thread Yu Li
Thanks Xintong for being our release manager and everyone else who made the
release possible!

Best Regards,
Yu


On Fri, 29 Jan 2021 at 15:05, Xintong Song  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/01/29/release-1.10.3.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Xintong Song
>


Re: [ANNOUNCE] Apache Flink 1.10.3 released

2021-01-28 Thread Yu Li
Thanks Xintong for being our release manager and everyone else who made the
release possible!

Best Regards,
Yu


On Fri, 29 Jan 2021 at 15:05, Xintong Song  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/01/29/release-1.10.3.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Xintong Song
>


Re: Flink SQL and checkpoints and savepoints

2021-01-28 Thread Dan Hill
Is this savepoint recovery issue also true with the Flink Table API?  I'd
assume so.  Just doublechecking.

On Mon, Jan 18, 2021 at 1:58 AM Timo Walther  wrote:

> I would check the past Flink Forward conference talks and blog posts. A
> couple of companies have developed connectors or modified existing
> connectors to make this work. Usually, based on event timestamps or some
> external control stream (DataStream API around the actual SQL pipeline
> for handling this).
>
> Also there is FLIP-150 which goes into this direction.
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
>
> Regards,
> Timo
>
>
> On 18.01.21 10:40, Dan Hill wrote:
> > Thanks Timo!
> >
> > The reason makes sense.
> >
> > Do any of the techniques make it easy to support exactly once?
> >
> > I'm inferring what is meant by dry out.  Are there any documented
> > patterns for it?  E.g. sending data to new kafka topics between releases?
> >
> >
> >
> >
> > On Mon, Jan 18, 2021, 01:04 Timo Walther  > > wrote:
> >
> > Hi Dan,
> >
> > currently, we cannot provide any savepoint guarantees between
> releases.
> > Because of the nature of SQL that abstracts away runtime operators,
> it
> > might be that a future execution plan will look completely different
> > and
> > thus we cannot map state anymore. This is not avoidable because the
> > optimizer might get smarter when adding new optimizer rules.
> >
> > For such cases, we recommend to dry out the old pipeline and/or warm
> up
> > a new pipeline with historic data when upgrading Flink. A change in
> > columns sometimes works but even this depends on the used operators.
> >
> > Regards,
> > Timo
> >
> >
> > On 18.01.21 04:46, Dan Hill wrote:
> >  > How well does Flink SQL work with checkpoints and savepoints?  I
> > tried
> >  > to find documentation for it in v1.11 but couldn't find it.
> >  >
> >  > E.g. what happens if the Flink SQL is modified between releases?
> > New
> >  > columns?  Change columns?  Adding joins?
> >  >
> >  >
> >
>
>


Re: Publish heartbeat messages in all Kafka partitions

2021-01-28 Thread Arvid Heise
Hi Alexey,

I don't see a way to do it with one message in FlinkKafkaProducer. So you
have to multiply the heartbeat yourself. I'd imagine the easiest way would
be to query the number of partitions of the output topic (it's static in
Kafka) in the heartbeat producer and already produce as all records.

Best,

Arvid

On Fri, Jan 29, 2021 at 1:13 AM Alexey Trenikhun  wrote:

> Hello,
> We need to publish heartbeat messages in all topic partitions. Is possible
> to produce single message and then somehow broadcast it to all partitions
> from FlinkKafkaProducer? Or only way that message source knows number of
> existing partitions and sends 1 message to 1 partition?
>
> Thanks,
> Alexey
>


Re: Deduplicating record amplification

2021-01-28 Thread Arvid Heise
Hi Rex,

there cannot be any late event in processing time by definition (maybe on a
quantum computer?), so you should be fine. The timestamp of records in
processing time is monotonously increasing.

Best,

Arvid

On Fri, Jan 29, 2021 at 1:14 AM Rex Fenley  wrote:

> Switching to TumblingProcessingTimeWindows seems to have solved that
> problem.
>
> For my own understanding, this won't have any "late" and therefore dropped
> records right? We cannot blindly drop a record from the aggregate
> evaluation, it just needs to take all the records it gets in a window and
> process them and then the aggregate will take whatever is last in-order.
>
> Thanks!
>
> On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley  wrote:
>
>> It looks like it wants me to call assignTimestampsAndWatermarks but I
>> already have a timer on my window which I'd expect everything entering this
>> stream would simply be aggregated during that window
>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>
>> On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley  wrote:
>>
>>> I think I may have been affected by some late night programming.
>>>
>>> Slightly revised how I'm using my aggregate
>>> val userDocsStream =
>>> this.tableEnv
>>> .toRetractStream(userDocsTable, classOf[Row])
>>> .keyBy(_.f1.getField(0))
>>> val compactedUserDocsStream = userDocsStream
>>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>> .aggregate(new CompactionAggregate())
>>> but this now gives me the following exception:
>>> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>>> timestamp marker). Is the time characteristic set to 'ProcessingTime',
>>> or did you forget to call
>>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>> at org.apache.flink.streaming.api.windowing.assigners.
>>> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:295)
>>> at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:161)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:178)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:153)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:351)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxStep(MailboxProcessor.java:191)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:181)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:566)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:536)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>> Which I'm not at all sure how to interpret
>>>
>>> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley  wrote:
>>>
 Ok, that sounds like it confirms my expectations.

 So I tried running my above code and had to slightly edit to using java
 Tuple2 because our execution environment stuff is all in Java.

 class CompactionAggregate
 extends AggregateFunction[
 Tuple2[java.lang.Boolean, Row],
 Tuple2[java.lang.Boolean, Row],
 Tuple2[java.lang.Boolean, Row]
 ] {

 override def createAccumulator() = new Tuple2(false, null)

 // Just take the lastest value to compact.
 override def add(
 value: Tuple2[java.lang.Boolean, Row],
 accumulator: Tuple2[java.lang.Boolean, Row]
 ) =
 value

 override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
 accumulator

 // This is a required function that we don't use.
 override def merge(
 a: Tuple2[java.lang.Boolean, Row],
 b: Tuple2[java.lang.Boolean, Row]
 ) =
 throw new NotImplementedException()
 }

 But when running I get the following error:
 >Caused by: java.lang.RuntimeException: Could not extract key from
 [redacted row]
 >...
 > Caused by: org.apache.flink.table.api.ValidationException:
 Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT'
 kind are supported when converting to an expression.

 I'm googling around and haven't found anything informative about what
 might be causing this issue. Any ideas?

 I'll also take a look at the SQL functions you suggested and see if I
 can use those.

 Thanks!



 On Wed, Jan 27, 2021 at 11:48 PM Arvid 

[ANNOUNCE] Apache Flink 1.10.3 released

2021-01-28 Thread Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/01/29/release-1.10.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xintong Song


退订

2021-01-28 Thread 追梦的废柴



[ANNOUNCE] Apache Flink 1.10.3 released

2021-01-28 Thread Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/01/29/release-1.10.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xintong Song


Re: 提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-28 Thread lp
应该说是否:1.11和1.12这里这两种提交方式 是不是一样的,只不过命令有了变化?

官网中的摘录如下:

flink-1.11:
Run a single Flink job on YARN

Example:
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

--
flink-1.12:
Per-Job Cluster Mode

Example:
./bin/flink run -t yarn-per-job --detached
./examples/streaming/TopSpeedWindowing.jar



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: reduce函数的trigger问题

2021-01-28 Thread yang nick
窗口没有结束,所有的数据都还在的

xiaolail...@163.com  于2021年1月29日周五 上午11:27写道:

> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
> env.socketTextStream("localhost", )
> .flatMap(new Splitter())
> .keyBy(value -> value.f0)
> .window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 Integer> value1, Tuple2 value2) throws Exception {
> return new Tuple2<>(value1.f0, value1.f1 +
> value2.f1);
> }
> });
>
> 使用的trigger是:
> @Override
> public Trigger
> getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
>
> 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
> 多谢指导!
>
>
>
>
>
> xiaolail...@163.com
>


回复:关于Flink作业的负载监控 task-load指标

2021-01-28 Thread 13051111332
滴滴文章描述如下:
我们一直希望能精确衡量任务的负载状况,使用反压指标指标只能粗略的判断任务的资源够或者不够。
结合新版的 Mailbox 线程模型,所有互斥操作全部运行在 TaskThread 中,只需统计出线程的占用时间,就可以精确计算任务负载的百分比。
未来可以使用指标进行任务的资源推荐,让任务负载维持在一个比较健康的水平
在2021年01月29日 11:59,1305332<1305...@163.com> 写道:


Hi,everyone:
看到滴滴的一篇文章中指出task-load指标,这个线程的占用时间 具体该怎么统计呢?求大神指点

| |
1305332
|
|
1305...@163.com
|
签名由网易邮箱大师定制



关于Flink作业的负载监控 task-load指标

2021-01-28 Thread 13051111332


Hi,everyone:
滴滴的一篇文档中提到:


 "我们一直希望能精确衡量任务的负载状况,使用反压指标指标只能粗略的判断任务的资源够或者不够。结合新版的 
Mailbox 线程模型,所有互斥操作全部运行在 TaskThread 中,只需统计出线程的占用时间,就可以精确计算任务负载的百分比。
未来可以使用指标进行任务的资源推荐,让任务负载维持在一个比较健康的水平”
  关于统计出线程的占用时间,这个具体该怎么做呢?




Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 Thread 赵一旦
所有分区无数据,为什么还期望watermark推进呢?目的是啥。貌似没啥需要计算的呀。

LakeShen  于2021年1月28日周四 下午7:42写道:

> 如果是窗口类聚合,可以尝试一下自定义窗口 Trigger
>
> Best,
> LakeShen
>
> 林影  于2021年1月28日周四 下午5:46写道:
>
> > Hi, Jessica.J.Wang
> > 开源flink看起来没这个功能哈,文档翻了一遍没找到
> >
> > Jessica.J.Wang  于2021年1月28日周四 下午5:25写道:
> >
> > > 你使用的是什么窗口呢,是 Tumble或者Hop吗,如果没数据 但是想提前输出结果可以用 emit
> > >
> > >
> >
> https://help.aliyun.com/document_detail/98951.html?spm=5176.11065259.1996646101.searchclickresult.513d1037M5VADa
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>


提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-28 Thread lp
如题,在 ./flink --help中看到提交job的命令有两个相似的,貌似都会将jobManager发送yarn
node上去之行,但不明白他们区别,官网也未找到他们的区别,请帮忙解释下他们之间的区别?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: reduce函数的trigger问题

2021-01-28 Thread 赵一旦
此处的trigger你可以理解成是计算结果的输出时机,你的reduce计算还是增量的哈。

xiaolail...@163.com  于2021年1月29日周五 上午11:27写道:

> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
> env.socketTextStream("localhost", )
> .flatMap(new Splitter())
> .keyBy(value -> value.f0)
> .window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 Integer> value1, Tuple2 value2) throws Exception {
> return new Tuple2<>(value1.f0, value1.f1 +
> value2.f1);
> }
> });
>
> 使用的trigger是:
> @Override
> public Trigger
> getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
>
> 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
> 多谢指导!
>
>
>
>
>
> xiaolail...@163.com
>


Re: 为什么window的reduceFunction不支持RichFunction呢

2021-01-28 Thread 赵一旦
不是,flink是提供了richReduce,但不支持基于window的richReduce。
基于window的reduce只支持简单reduce,具体需要做自定义状态计算的,按照注释要求使用windowFunction和ProcessWindowFunction去做呀。

一直都是这样,1.12也是的哈。

Kezhu Wang  于2021年1月29日周五 上午11:40写道:

> reduceFunction 是和 ReducingStateDescriptor 配合的, 并不是
> “window的”。“WindowOperator” 的 function 是 “InternalWindowFunction”,这个可以是
> “RichFunction”。
>
> Flink 中的 state 除了可以绑定 key,也可以绑定 namespace,只是 namespace
> 并没有直接暴露给用户。如果需要的话,可以自己写个 WindowOperator,暴露个 WindowFunction。
>
> Interface WindowFunction {
> // You could do incremental aggregation here.
> void processElement(Context context, Window window, Element element);
>
> void fireWindow(Context context, Window window);
> }
>
> interface WindowedRuntimeContext {
>  State getWindowedState(StateDescriptor descriptor).
> }
>
> 把所有 window concepts 交给 WindowOperator,WindowFunction 作具体业务。
>
> On January 28, 2021 at 20:26:47, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。
>
> 但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
> 如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。
>
>
>


关于Flink作业的负载监控 task-load指标

2021-01-28 Thread 13051111332


Hi,everyone:
看到滴滴的一篇文章中指出task-load指标,这个线程的占用时间 具体该怎么统计呢?求大神指点

| |
1305332
|
|
1305...@163.com
|
签名由网易邮箱大师定制



Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-01-28 Thread Xintong Song
The ZK client side uses 15s connection timeout and 60s session timeout
in Flink. There's nothing similar to a heartbeat interval configured, which
I assume is up to ZK's internal implementation. These things have not
changed in FLink since at least 2017.

If both ZK client and server complain about timeout, and there's no gc
issue spotted, I would consider a network instability.

Thank you~

Xintong Song



On Fri, Jan 29, 2021 at 3:15 AM Lu Niu  wrote:

> After checking the log I found the root cause is zk client timeout on TM:
> ```
> 2021-01-25 14:01:49,600 WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f
> 2021-01-25 14:01:49,610 INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
> 2021-01-25 14:01:49,711 INFO
> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
> - State change: SUSPENDED
> 2021-01-25 14:01:49,711 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
> 27ac39342913d29baac4cde13062c4a4 with leader id
> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
> 2021-01-25 14:01:49,712 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
> connection for job 27ac39342913d29baac4cde13062c4a4.
> 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Sink:
> USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
> (d5b5887e639874cb70d7fef939b957b7).
> 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task -
> Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
> (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: JobManager responsible for
> 27ac39342913d29baac4cde13062c4a4 lost the leadership.
> ```
>
> I checked that TM gc log, no gc issues. it also shows client timeout in
> zookeeper server log. How frequently the zk client sync with server side in
> flink? The log says client doesn't heartbeat to server for 40s. Any help?
> thanks!
>
> Best
> Lu
>
>
> On Thu, Dec 17, 2020 at 6:10 PM Xintong Song 
> wrote:
>
>> I'm not aware of any significant changes to the HA components between
>> 1.9/1.11.
>> Would you mind sharing the complete jobmanager/taskmanager logs?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Dec 18, 2020 at 8:53 AM Lu Niu  wrote:
>>
>>> Hi, Xintong
>>>
>>> Thanks for replying and your suggestion. I did check the ZK side but
>>> there is nothing interesting. The error message actually shows that only
>>> one TM thought JM lost leadership while others ran fine. Also, this
>>> happened only after we migrated from 1.9 to 1.11. Is it possible this is
>>> introduced by 1.11?
>>>
>>> Best
>>> Lu
>>>
>>> On Wed, Dec 16, 2020 at 5:56 PM Xintong Song 
>>> wrote:
>>>
 Hi Lu,

 I assume you are using ZooKeeper as the HA service?

 A common cause of unexpected leadership lost is the instability of HA
 service. E.g., if ZK does not receive heartbeat from Flink RM for a
 certain period of time, it will revoke the leadership and notify
 other components. You can look into the ZooKeeper logs checking why RM's
 leadership is revoked.

 Thank you~

 Xintong Song



 On Thu, Dec 17, 2020 at 8:42 AM Lu Niu  wrote:

> Hi, Flink users
>
> Recently we migrated to flink 1.11 and see exceptions like:
> ```
> 2020-12-15 12:41:01,199 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60)
> (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
> java.lang.Exception: Job leader for job id
> 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
> ~[nrtg-1.11_deploy.jar:?]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
> 

Hi??

2021-01-28 Thread Ayesha Johnson
Hi dear friend. how are you doing today and how is business moving? i
contact to buy from your company
kindly send me your latest catalog. also inform me about the
1)Minimum Order Quantity,
2)Delivery time or FOB,
3) payment terms warranty.
Please contact us via email: Your early reply is highly appreciated.
__

Mrs Ayesha Johnson

Sales & Marketing Manager.

National  Emirates Company Limited.

2nd Floor, Office # 205, Al Khabaisi Street, Deira, Dubai
Landmark: Behind Hyundai Showroom

P.O.Box: 97349,
Dubai
Tel: +971 4 7664822

Fax: +971 4 766524.


Re: 为什么window的reduceFunction不支持RichFunction呢

2021-01-28 Thread Kezhu Wang
reduceFunction 是和 ReducingStateDescriptor 配合的, 并不是
“window的”。“WindowOperator” 的 function 是 “InternalWindowFunction”,这个可以是
“RichFunction”。

Flink 中的 state 除了可以绑定 key,也可以绑定 namespace,只是 namespace
并没有直接暴露给用户。如果需要的话,可以自己写个 WindowOperator,暴露个 WindowFunction。

Interface WindowFunction {
// You could do incremental aggregation here.
void processElement(Context context, Window window, Element element);

void fireWindow(Context context, Window window);
}

interface WindowedRuntimeContext {
 State getWindowedState(StateDescriptor descriptor).
}

把所有 window concepts 交给 WindowOperator,WindowFunction 作具体业务。

On January 28, 2021 at 20:26:47, 赵一旦 (hinobl...@gmail.com) wrote:

问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。

但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。


Re: 为什么window的reduceFunction不支持RichFunction呢

2021-01-28 Thread Smile
Hi, nobleyd,

请问你是在哪个版本发现 reduceFunction 不支持 RichFunction 呢?
我在1.12 版本试了一下是支持的呀,而且 JavaDoc 里也有 RichReduceFunction 类[1]

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/functions/RichReduceFunction.html





--
Sent from: http://apache-flink.147419.n8.nabble.com/


reduce函数的trigger问题

2021-01-28 Thread xiaolail...@163.com
您好!最近刚开始学习flink,问一个关于trigger的问题:

如下的reduce操作:
env.socketTextStream("localhost", )
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
.reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});

使用的trigger是:
@Override
public Trigger getDefaultTrigger(StreamExecutionEnvironment 
env) {
return EventTimeTrigger.create();
}

然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
多谢指导!





xiaolail...@163.com


Re: 对于 Flink 可撤回流做聚合,当上游数据撤回时,最终聚合值变小了,怎么解决这种问题。

2021-01-28 Thread Jessica.J.Wang
要看一下具体的Sql 或者具体的算子

下游的 sink needRetract=false的情况,有些场景可以抑制上游算子的回撤,Retract 可以优化成 Upsert



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 使用DataStream API + HBaseRowInputFormat读取HBase时表时,Row arity of from (2) does not match this serializers field length (1)

2021-01-28 Thread Jessica.J.Wang
可以参照一下 HBaseTableSource 里面的实现方法

HBaseTableSchema hbaseSchema = new HBaseTableSchema();
hbaseSchema.addColumn(xxx)
hbaseSchema.setRowKey(xxx);


execEnv.createInput(new HBaseRowInputFormat(conf, tableName, hbaseSchema),
getReturnType())
.name(explainSource());



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:对于 Flink 可撤回流做聚合,当上游数据撤回时,最终聚合值变小了,怎么解决这种问题。

2021-01-28 Thread Michael Ran
以前做过,自定义sink,更新值小于 存储值的时候不更新
在 2021-01-25 16:00:28,"LakeShen"  写道:
>Hi 社区,
>
>之前遇到一个问题,在 Flink 里面,对于一个数据流(能够撤回的)进行聚合时,当这个数据流有大量撤回时,最终聚合值会变小。这种情况之前有看到说加一个
>mini batch 来减轻这种情况的影响,还有别的方法能够解决这个问题吗?
>
>Best,
>LakeShen


Re: 怎么理解 tolerableCheckpointFailureNumber

2021-01-28 Thread Yun Tang
Hi,

tolerableCheckpointFailureNumber 限制的是最大可容忍的连续失败checkpoint计数 
continuousFailureCounter [1],例如将tolerableCheckpointFailureNumber 
设置成3,连续失败3次,continuousFailureCounter 会累计到3,作业就会尝试重启。
如果中间有一个checkpoint成功了,continuousFailureCounter 就会重置为零 [2]。

checkpoint失败后,如果作业没有发生failover,下一次checkpoint还是周期性的触发,并受 
execution.checkpointing.min-pause [3] 等参数的影响。


[1] 
https://github.com/apache/flink/blob/4f5747fa0f7226c780742a4549408a38bc95d052/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L51
[2] 
https://github.com/apache/flink/blob/4f5747fa0f7226c780742a4549408a38bc95d052/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L161-L171
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#execution-checkpointing-min-pause

祝好
唐云


From: jiangjiguang719 
Sent: Friday, January 29, 2021 9:35
To: user-zh@flink.apache.org 
Subject: 怎么理解 tolerableCheckpointFailureNumber

tolerableCheckpointFailureNumber 是设置可容忍的checkpoint失败次数,具体怎么理解呢?比如 设置为3
1. 当checkpoint 失败时,该值+1,直到 大于 3,实时作业就发生失败或重启?
2. 当checkpoint 失败时,是立即进行下个checkpoint?还是根据周期设置自动触发?
3. 该值是累加值吗


怎么理解 tolerableCheckpointFailureNumber

2021-01-28 Thread jiangjiguang719
tolerableCheckpointFailureNumber 是设置可容忍的checkpoint失败次数,具体怎么理解呢?比如 设置为3
1. 当checkpoint 失败时,该值+1,直到 大于 3,实时作业就发生失败或重启?
2. 当checkpoint 失败时,是立即进行下个checkpoint?还是根据周期设置自动触发?
3. 该值是累加值吗

Re: Initializing broadcast state

2021-01-28 Thread Guowei Ma
Hi Nick
Following is an example(could not run but just to explain the idea). I use
the `KeyedBroadcastProcessFunction` because I saw your code use the
keyedstate.

private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast
extends KeyedBroadcastProcessFunction {

private static final long serialVersionUID = 7496674620398203933L;

private final ListStateDescriptor listStateDesc =
new ListStateDescriptor<>("cache element",
BasicTypeInfo.STRING_TYPE_INFO);



@Override
public void processBroadcastElement(Integer value, Context ctx,
Collector out)
throws Exception {
ctx.applyToKeyedState(
listStateDesc,
new KeyedStateFunction>() {
@Override
public void process(String key, ListState
state) throws Exception {
// do the logical with cache state and broadcat value;
// clear the state
state.clear();
}
});
}

@Override
public void processElement(String value, ReadOnlyContext ctx,
Collector out)
throws Exception {
YourBroadCastState = ctx.getBroadcastState("your broad cast state");
if (YourBroadCastState is empty) {
// cache the element
getRuntimeContext().getListState(listStateDesc).add(value);
} else {
// do your business logic with YourBroadCastState and your value.
}
}
}

Best,
Guowei


On Wed, Jan 27, 2021 at 4:31 AM Nick Bendtner  wrote:

> Thanks a lot Guowei, that makes sense. I will go with the caching
> approach. Can you point me to any example which shows what is the most
> efficient way to cache elements.
> Thanks a ton for your help.
>
> Best,
> Nick
>
> On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma  wrote:
>
>> Hi,Nick
>> I do not think you could update the `myState`  in the
>> `processBroadcastElement`. It is because you need a key before to update
>> the keyedstate. But there is no key in `processBroadcastElement` .
>> Best,
>> Guowei
>>
>>
>> On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner  wrote:
>>
>>> Hi Guowei,
>>> I am not using a keyed broadcast function, I use [1].  My question is,
>>> can a non broadcast state, for instance value state/map state be updated
>>> whenever I get a broadcast event in *processBroadcastElement*. This way
>>> the state updates are consistent since each instance of the task gets the
>>> same broadcast element.
>>>
>>> ```
>>> private MapState myState;
>>>
>>> @Override
>>>public void processElement(InputType value, ReadOnlyContext ctx,
>>> Collector out) throws Exception {
>>>  // Iterate over map state.
>>>myState.iterator().forEach(entry -> ())// Business logic
>>>
>>>// Do things
>>>}
>>>
>>>@Override
>>>public void processBroadcastElement(BroadcastedStateType value,
>>> Context ctx, Collector out) throws Exception {
>>>  // update map state which is not a broadcast state. Same update in
>>> every sub operator
>>>state.put(value.ID(), value.state());   // Update the mapState
>>> with value from broadcast
>>>}
>>>
>>>
>>>@Override
>>>  public void snapshotState(FunctionSnapshotContext context) throws
>>> Exception {
>>>
>>>  // called when it's time to save state
>>>
>>>  myState.clear();
>>>
>>>  // Update myState with current application state
>>>
>>>  }
>>>
>>>  @Override
>>>  public void initializeState(FunctionInitializationContext context)
>>> throws Exception {
>>>
>>>  // called when things start up, possibly recovering from an error
>>>
>>>  descriptor = new MapStateDescriptor<>("state", Types.STRING,
>>> Types.POJO(BroadcastedStateType.class));
>>>
>>>  myState = context.getKeyedStateStore().getMapState(descriptor);
>>>
>>>  if (context.isRestored()) {
>>>
>>>  // restore application state from myState
>>>
>>>  }
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html
>>> .
>>>
>>>
>>> Best,
>>> Nick.
>>>
>>> On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma  wrote:
>>>
 Hi,Nick
 Normally you could not iterate all the keyed states, but the
 `BroadCastState` & `applyTokeyedState` could do that.
 For example, before you get the broadcast side elements you might
 choose to cache the non-broadcast element to the keyed state. After the
 broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate
 all the elements you "cached" in the keyed state and do your business 
 logic.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html

 Best,
 Guowei


 On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner 
 wrote:

> Thanks Guowei. Another question I have is, what is the use of a
> broadcast state when I 

Configuring ephemeral storage limits when using Native Kubernetes

2021-01-28 Thread Emilien Kenler
Hello,

I'm trying to run Flink on Kubernetes, and I recently switched from 
lyft/flinkk8soperator to the Flink Native Kubernetes deployment mode.

I have a long running job, that I want to deploy (using application mode), and 
after a few hours, I noticed the deployment was disappearing.
After a quick look at the logs, it seems that the job manager was no longer to 
talk with the task manager after a while, because those were evicted by 
Kubernetes due to using more ephemeral storage than allowed.

We have limit ranges set per namespace with low default value, and each 
application deployed on Kubernetes needs to set values appropriate depending on 
its usage.
I couldn't find a way to configure those via Flink configuration.

Is there a way to set ephemeral storage requests and limits?
Are external resources supposed to help here?
If there is currently no way to do it, should it be added to the scope of 
FLINK-20324 ?

Thanks,
Emilien


Publish heartbeat messages in all Kafka partitions

2021-01-28 Thread Alexey Trenikhun
Hello,
We need to publish heartbeat messages in all topic partitions. Is possible to 
produce single message and then somehow broadcast it to all partitions from 
FlinkKafkaProducer? Or only way that message source knows number of existing 
partitions and sends 1 message to 1 partition?

Thanks,
Alexey


Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
Switching to TumblingProcessingTimeWindows seems to have solved that
problem.

For my own understanding, this won't have any "late" and therefore dropped
records right? We cannot blindly drop a record from the aggregate
evaluation, it just needs to take all the records it gets in a window and
process them and then the aggregate will take whatever is last in-order.

Thanks!

On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley  wrote:

> It looks like it wants me to call assignTimestampsAndWatermarks but I
> already have a timer on my window which I'd expect everything entering this
> stream would simply be aggregated during that window
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>
> On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley  wrote:
>
>> I think I may have been affected by some late night programming.
>>
>> Slightly revised how I'm using my aggregate
>> val userDocsStream =
>> this.tableEnv
>> .toRetractStream(userDocsTable, classOf[Row])
>> .keyBy(_.f1.getField(0))
>> val compactedUserDocsStream = userDocsStream
>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>> .aggregate(new CompactionAggregate())
>> but this now gives me the following exception:
>> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>> timestamp marker). Is the time characteristic set to 'ProcessingTime', or
>> did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>> at org.apache.flink.streaming.api.windowing.assigners.
>> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.processElement(WindowOperator.java:295)
>> at org.apache.flink.streaming.runtime.tasks.
>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>> .java:161)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .processElement(StreamTaskNetworkInput.java:178)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:153)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:67)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:351)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxStep(MailboxProcessor.java:191)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:181)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:566)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:536)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> at java.base/java.lang.Thread.run(Thread.java:829)
>>
>> Which I'm not at all sure how to interpret
>>
>> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley  wrote:
>>
>>> Ok, that sounds like it confirms my expectations.
>>>
>>> So I tried running my above code and had to slightly edit to using java
>>> Tuple2 because our execution environment stuff is all in Java.
>>>
>>> class CompactionAggregate
>>> extends AggregateFunction[
>>> Tuple2[java.lang.Boolean, Row],
>>> Tuple2[java.lang.Boolean, Row],
>>> Tuple2[java.lang.Boolean, Row]
>>> ] {
>>>
>>> override def createAccumulator() = new Tuple2(false, null)
>>>
>>> // Just take the lastest value to compact.
>>> override def add(
>>> value: Tuple2[java.lang.Boolean, Row],
>>> accumulator: Tuple2[java.lang.Boolean, Row]
>>> ) =
>>> value
>>>
>>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
>>> accumulator
>>>
>>> // This is a required function that we don't use.
>>> override def merge(
>>> a: Tuple2[java.lang.Boolean, Row],
>>> b: Tuple2[java.lang.Boolean, Row]
>>> ) =
>>> throw new NotImplementedException()
>>> }
>>>
>>> But when running I get the following error:
>>> >Caused by: java.lang.RuntimeException: Could not extract key from
>>> [redacted row]
>>> >...
>>> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
>>> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
>>> supported when converting to an expression.
>>>
>>> I'm googling around and haven't found anything informative about what
>>> might be causing this issue. Any ideas?
>>>
>>> I'll also take a look at the SQL functions you suggested and see if I
>>> can use those.
>>>
>>> Thanks!
>>>
>>>
>>>
>>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise  wrote:
>>>
 Hi Rex,

 if your keyby (and with join/grouping/windowing) is random or not
 depends on the relationship of the join/grouping key with your Kafka
 partitioning key.

 Say your partitioning key is document_id. Then, any join/grouping key
 that is composed of (or is exactly) document_id, will retain the order. You
 should always ask yourself 

flink checkpoints adjustment strategy

2021-01-28 Thread Marco Villalobos
I am kind of stuck in determining how large a checkpoint interval should be.

Is there a guide for that?  If a timeout time is 10 minutes, we time out,
what is a good strategy for adjusting that?

Where is a good starting point for a checkpoint? How shall they be
adjusted?

We often see checkpoint errors during our onTimer calls, I don't know if
that's related.

Marco A. Villalobos


Flink on Kubernetes, Task/Job Manager Recycles

2021-01-28 Thread Julian Cardarelli (CA)
Hello -

I am running some testing with flink on Kubernetes. Every let's say five to ten 
days, all the jobs disappear from running jobs. There's nothing under completed 
jobs, and there's no record of the submitted jar files in the cluster.

In some manner or another, it is almost like going into a fresh Flink 
installation. And so, I think that's probably what is happening.

Is there a persistent volume or something that needs to be setup to ensure that 
state is maintained between what seems like a pod restart? I'm not clear on 
where to add it based on the docs, if so.

Thank you




___
Julian Cardarelli
CEO
700-184 Front Street East
Toronto, ON, M5A 4N3, Canada
T: (800) 961-1549
E:jul...@thentia.com
DISCLAIMER
​
​Neither Thentia Corporation, nor its directors, officers, shareholders, 
representatives, employees, non-arms length companies, subsidiaries, parent, 
affiliated brands and/or agencies are licensed to provide legal advice. This 
e-mail may contain among other things legal information. We disclaim any and 
all responsibility for the content of this e-mail. YOU MUST NOT rely on any of 
our communications as legal advice. Only a licensed legal professional may give 
you advice. Our communications are never provided as legal advice, because we 
are not licensed to provide legal advice nor do we possess the knowledge, 
skills or capacity to provide legal advice. We disclaim any and all 
responsibility related to any action you might take based upon our 
communications and emphasize the need for you to never rely on our 
communications as the basis of any claim or proceeding.
CONFIDENTIALITY
​
​This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. If 
you have received this email in error please notify the system manager. This 
message contains confidential information and is intended only for the 
individual(s) named. If you are not the named addressee(s) you should not 
disseminate, distribute or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system. If you are not the intended recipient you are 
notified that disclosing, copying, distributing or taking any action in 
reliance on the contents of this information is strictly prohibited.   


[Stateful Functions] Problems with Protobuf Versions

2021-01-28 Thread Jan Brusch

Hi,

I have a bit of a strange problem: I can't get a Statefun Application to 
Compile or Run (Depending on the exact Protobuf version) with a Protobuf 
version newer than 3.3.0. I have had this problem over multiple project 
setups and multiple versions of Flink Statefun with Java8.


Protobuf 3.3.0 works fine and all, but it does seem a bit odd...


The most common error behaviour is a successful maven build and the 
following Runtime Error on Startup:


java.lang.NoClassDefFoundError: 
com/google/protobuf/GeneratedMessageV3$UnusedPrivateParameter



Does anyone else have this Problem or found a solution for this in the past?


Best regards

Jan





--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501



Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
It looks like it wants me to call assignTimestampsAndWatermarks but I
already have a timer on my window which I'd expect everything entering this
stream would simply be aggregated during that window
.window(TumblingEventTimeWindows.of(Time.seconds(1)))

On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley  wrote:

> I think I may have been affected by some late night programming.
>
> Slightly revised how I'm using my aggregate
> val userDocsStream =
> this.tableEnv
> .toRetractStream(userDocsTable, classOf[Row])
> .keyBy(_.f1.getField(0))
> val compactedUserDocsStream = userDocsStream
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
> .aggregate(new CompactionAggregate())
> but this now gives me the following exception:
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime', or
> did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
> at org.apache.flink.streaming.api.windowing.assigners.
> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:295)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:161)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:178)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:351)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:566)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:536)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> Which I'm not at all sure how to interpret
>
> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley  wrote:
>
>> Ok, that sounds like it confirms my expectations.
>>
>> So I tried running my above code and had to slightly edit to using java
>> Tuple2 because our execution environment stuff is all in Java.
>>
>> class CompactionAggregate
>> extends AggregateFunction[
>> Tuple2[java.lang.Boolean, Row],
>> Tuple2[java.lang.Boolean, Row],
>> Tuple2[java.lang.Boolean, Row]
>> ] {
>>
>> override def createAccumulator() = new Tuple2(false, null)
>>
>> // Just take the lastest value to compact.
>> override def add(
>> value: Tuple2[java.lang.Boolean, Row],
>> accumulator: Tuple2[java.lang.Boolean, Row]
>> ) =
>> value
>>
>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
>> accumulator
>>
>> // This is a required function that we don't use.
>> override def merge(
>> a: Tuple2[java.lang.Boolean, Row],
>> b: Tuple2[java.lang.Boolean, Row]
>> ) =
>> throw new NotImplementedException()
>> }
>>
>> But when running I get the following error:
>> >Caused by: java.lang.RuntimeException: Could not extract key from
>> [redacted row]
>> >...
>> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
>> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
>> supported when converting to an expression.
>>
>> I'm googling around and haven't found anything informative about what
>> might be causing this issue. Any ideas?
>>
>> I'll also take a look at the SQL functions you suggested and see if I can
>> use those.
>>
>> Thanks!
>>
>>
>>
>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise  wrote:
>>
>>> Hi Rex,
>>>
>>> if your keyby (and with join/grouping/windowing) is random or not
>>> depends on the relationship of the join/grouping key with your Kafka
>>> partitioning key.
>>>
>>> Say your partitioning key is document_id. Then, any join/grouping key
>>> that is composed of (or is exactly) document_id, will retain the order. You
>>> should always ask yourself the question: can two records coming from the
>>> ordered Kafka partition X be processed by two different operator instances.
>>> For a join/grouping operator, there is only the strict guarantee that all
>>> records with the same key will be shuffled into the same operator instance.
>>>
>>> Your compaction in general looks good but I'm not deep into Table API.
>>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table
>>> API should already do what you want. [1]
>>>
>>> [1]
>>> 

Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
I think I may have been affected by some late night programming.

Slightly revised how I'm using my aggregate
val userDocsStream =
this.tableEnv
.toRetractStream(userDocsTable, classOf[Row])
.keyBy(_.f1.getField(0))
val compactedUserDocsStream = userDocsStream
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.aggregate(new CompactionAggregate())
but this now gives me the following exception:
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
timestamp marker). Is the time characteristic set to 'ProcessingTime', or
did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.
TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:351)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:566)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:829)

Which I'm not at all sure how to interpret

On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley  wrote:

> Ok, that sounds like it confirms my expectations.
>
> So I tried running my above code and had to slightly edit to using java
> Tuple2 because our execution environment stuff is all in Java.
>
> class CompactionAggregate
> extends AggregateFunction[
> Tuple2[java.lang.Boolean, Row],
> Tuple2[java.lang.Boolean, Row],
> Tuple2[java.lang.Boolean, Row]
> ] {
>
> override def createAccumulator() = new Tuple2(false, null)
>
> // Just take the lastest value to compact.
> override def add(
> value: Tuple2[java.lang.Boolean, Row],
> accumulator: Tuple2[java.lang.Boolean, Row]
> ) =
> value
>
> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
> accumulator
>
> // This is a required function that we don't use.
> override def merge(
> a: Tuple2[java.lang.Boolean, Row],
> b: Tuple2[java.lang.Boolean, Row]
> ) =
> throw new NotImplementedException()
> }
>
> But when running I get the following error:
> >Caused by: java.lang.RuntimeException: Could not extract key from
> [redacted row]
> >...
> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
> supported when converting to an expression.
>
> I'm googling around and haven't found anything informative about what
> might be causing this issue. Any ideas?
>
> I'll also take a look at the SQL functions you suggested and see if I can
> use those.
>
> Thanks!
>
>
>
> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> if your keyby (and with join/grouping/windowing) is random or not depends
>> on the relationship of the join/grouping key with your Kafka partitioning
>> key.
>>
>> Say your partitioning key is document_id. Then, any join/grouping key
>> that is composed of (or is exactly) document_id, will retain the order. You
>> should always ask yourself the question: can two records coming from the
>> ordered Kafka partition X be processed by two different operator instances.
>> For a join/grouping operator, there is only the strict guarantee that all
>> records with the same key will be shuffled into the same operator instance.
>>
>> Your compaction in general looks good but I'm not deep into Table API.
>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table
>> API should already do what you want. [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>>
>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley  wrote:
>>
>>> In addition to those questions, assuming that keyed streams are in
>>> order, I've come up with the following solution to compact our records and
>>> only pick the most recent one per id before sending to the ES sink.
>>>
>>> The first item in the Row is the document ID / primary key which we want
>>> to compact records on.

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-01-28 Thread Lu Niu
After checking the log I found the root cause is zk client timeout on TM:
```
2021-01-25 14:01:49,600 WARN
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
session timed out, have not heard from server in 40020ms for sessionid
0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
session timed out, have not heard from server in 40020ms for sessionid
0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
- State change: SUSPENDED
2021-01-25 14:01:49,711 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.
2021-01-25 14:01:49,712 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
27ac39342913d29baac4cde13062c4a4 with leader id
b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.
2021-01-25 14:01:49,712 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to fail task externally Sink:
USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
(d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task -
Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
(d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for
27ac39342913d29baac4cde13062c4a4 lost the leadership.
```

I checked that TM gc log, no gc issues. it also shows client timeout in
zookeeper server log. How frequently the zk client sync with server side in
flink? The log says client doesn't heartbeat to server for 40s. Any help?
thanks!

Best
Lu


On Thu, Dec 17, 2020 at 6:10 PM Xintong Song  wrote:

> I'm not aware of any significant changes to the HA components between
> 1.9/1.11.
> Would you mind sharing the complete jobmanager/taskmanager logs?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Dec 18, 2020 at 8:53 AM Lu Niu  wrote:
>
>> Hi, Xintong
>>
>> Thanks for replying and your suggestion. I did check the ZK side but
>> there is nothing interesting. The error message actually shows that only
>> one TM thought JM lost leadership while others ran fine. Also, this
>> happened only after we migrated from 1.9 to 1.11. Is it possible this is
>> introduced by 1.11?
>>
>> Best
>> Lu
>>
>> On Wed, Dec 16, 2020 at 5:56 PM Xintong Song 
>> wrote:
>>
>>> Hi Lu,
>>>
>>> I assume you are using ZooKeeper as the HA service?
>>>
>>> A common cause of unexpected leadership lost is the instability of HA
>>> service. E.g., if ZK does not receive heartbeat from Flink RM for a
>>> certain period of time, it will revoke the leadership and notify
>>> other components. You can look into the ZooKeeper logs checking why RM's
>>> leadership is revoked.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Dec 17, 2020 at 8:42 AM Lu Niu  wrote:
>>>
 Hi, Flink users

 Recently we migrated to flink 1.11 and see exceptions like:
 ```
 2020-12-15 12:41:01,199 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
 USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
 USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60)
 (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on
 org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
 java.lang.Exception: Job leader for job id
 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
 at
 org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
 ~[nrtg-1.11_deploy.jar:?]
 at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
 at
 org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
 ~[nrtg-1.11_deploy.jar:?]
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 ~[nrtg-1.11_deploy.jar:?]
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 ~[nrtg-1.11_deploy.jar:?]
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[nrtg-1.11_deploy.jar:?]
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
 [nrtg-1.11_deploy.jar:?]
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
 [nrtg-1.11_deploy.jar:?]
 at 

question on checkpointing

2021-01-28 Thread Marco Villalobos
Is it possible that checkpointing times out due to an operator taking too
long?

Also, does windowing affect the checkpoint barriers?


Re: Timers not firing until stream end

2021-01-28 Thread Pilgrim Beart
Chesnay,
1) Correct, I'd like the timeout event (generated at eventTime==1000) to
appear in its correct time sequence in the output, i.e. before eventTime
exceeds 1000.
It's great that Flink can deal with out-of-orderness, but I didn't expect
it to spontaneously create it (especially with parallelism==1).
In the previous case the timeout is emitted late by 2 seconds (4 events).
So I was wondering - how late could it be?
I dialled down the Duration of the WatermarkGenerator BoundedOutOfOrderness
to 0, and the timeout now only appears *slightly *late, as log output below.
By inserting extra timestamps, I've demonstrated that this is "1 event"
late, rather than "1 second" late.
It's as if the watermark generator realises that time is advancing, so it
triggers the timeout, but only after emitting the event that advanced time?
At least this is feeling deterministic.
Although, relying on the presence of that "forcing" event seems non-ideal -
if there just happens not to be one, due to a gap in other ID streams,
we'll get unbounded latency in our timeouts, which means we can't offer any
downstream systems any out-of-orderness guarantee.

2) Apart from that unbounded latency concern, it's a fair point that if
I'm going to partition the output by ID anyway, this isn't a huge problem.

3) Is there any negative effect of setting the BoundedOutofOrderness
duration to 0? Does it somehow make Flink less efficient?

4) In a subsequent stage, we want to do time-window aggregation (but only
within, not across, IDs). Setting the watermark duration to 0 will make the
window emit immediately. Bu we want data that arrives less than 2 minutes
late not to be considered late, i.e. don't emit any window until the latest
event time is at least 2 minutes after the window end time. Is it possible
to set watermark strategies separately per processing stage?

Thanks again for all your very helpful responses,

{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelled previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelled previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelled previous timer. "}
{"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}
 // <- Arrives one event too late
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelled previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelled previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelled previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelled previous timer. "}

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot

 +44 7961 125282
See our latest features

and book me

for
a video call.



On Thu, 28 Jan 2021 at 12:34, Chesnay Schepler  wrote:

> I'm not sure I see the problem in your output.
>
> For any given key the timestamps are in order, and the events where
> devices are offline seem to occur at the right time.
>
> Is it just that you'd like the following line to occur earlier in the
> output?
>
>  {"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}
>
> If so, then I'd just partition the output by key and evaluate them
> individually.
> On 1/28/2021 9:53 AM, Pilgrim Beart wrote:
>
> Scratch that - your WatermarkStrategy DOES work (when I implement it
> correctly!).
> Well, almost: As you can see below (code pushed to repo), the Timer events
> are still appearing somewhat late in the stream - 4 events late in this
> case. It may be just good-enough for my purposes, though it will make
> building test cases painful, so if you have any ideas how I could fix that,
> would be much appreciated.
>
> {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
> {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
> 

AW: Stateful Functions - accessing the state aside of normal processing

2021-01-28 Thread Stephan Pelikan
Hi Gordon,

If operating on checkpoints instead of savepoints this might be OK. But since 
this is not in the current scope I digged into Flink docs and found the 
"queryable state" 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html#querying-state).

This sounds good and seems to be a possibility to read the state of a specific 
function by id. This would solve the first part of my challange (examining the 
current state). Additionally there is remote client what makes things easy.

As far as I understand its only necessary to enable this for statefuns. If the 
types like PersistedValue also takes a queryable-name like ValueStateDescriptor 
it could be passed through in places like 
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java#L65.
 Then the state of single jobs could be retrieved if I'm right. But I can only 
query states of a specific statefun by id. Not the total crowd of states.

To get a solution in the "near" future I could send "state changes" egress 
messages and stream them into an ElasticSearch sink. Then I could search that 
ES index the way I like. I only have to check if that works in terms of amount 
of data and throughput. Additionally I'll have to consider how to structure 
those "state changes" events in the ES to be able to query as I need. As a 
give-away I would get historical data of states outdated or cleared.

This sounds like a feasible solution. What do you think?

Cheers,
Stephan


Von: Tzu-Li (Gordon) Tai 
Gesendet: Donnerstag, 28. Jänner 2021 04:06
An: Stephan Pelikan 
Cc: user@flink.apache.org
Betreff: Re: Stateful Functions - accessing the state aside of normal processing

Hi Stephan,

Great to hear about your experience with StateFun so far!

I think what you are looking for is a way to read StateFun checkpoints, which 
are basically an immutable consistent point-in-time snapshot of all the states 
across all your functions, and run some computation or simply to explore the 
state values.
StateFun checkpoints are essentially adopted from Flink, so you can find more 
detail about that here [1].

Currently, StateFun does provide a means for state "bootstrapping": running a 
batch offline job to write and compose a StateFun checkpoint [2].
What is still missing is the "reading / analysis" side of things, to do exactly 
what you described: running a separate batch offline job for reading and 
processing an existing StateFun checkpoint.

Before we dive into details on how that may look like, do you think that is 
what you would need?

Although I don't think we would be able to support such a feature yet since 
we're currently focused on reworking the SDKs and request-reply protocol, in 
any case it would be interesting to discuss if this feature would be important 
for multiple users already.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html#checkpointing
[2] 
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/deployment-and-operations/state-bootstrap.html

On Wed, Jan 27, 2021 at 11:41 PM Stephan Pelikan 
mailto:stephan.peli...@phactum.at>> wrote:
Hi,

We are trying to use Statefuns for our tool and it seems to be a good fit. I 
already adopted it and it works quite well. However, we have millions of 
different states (all the same FunctionType but different ids) and each state 
consists of several @Persisted values (values and tables). We want to build an 
administration tool for examining the crowd of states (count, histogram, etc.) 
and each state in detail (the persisted-tables and -values).

Additionally we need some kind of dig-down functionality for finding those 
individual states. For example some of those persisted values can be used to 
categorize the crowd of states.

My question now is how to achieve this. Is there a way to browse and examine 
statefuns in a read-only fashion (their ids, their persisted values)? How can 
one achieve this without duplicating status in e.g. a relational database?

Thanks,
Stephan

PS: I have another questions but I will send them in separate mails to avoid 
mixing up topics.


Re: presto s3p checkpoints and local stack

2021-01-28 Thread Marco Villalobos
Is it possible to use an environmental credentials provider?

On Thu, Jan 28, 2021 at 8:35 AM Arvid Heise  wrote:

> Hi Marco,
>
> afaik you don't need HADOOP_HOME or core-site.xml.
>
> I'm also not sure from where you got your config keys. (I guess from the
> Presto page, which probably all work if you remove hive., maybe we should
> also support that)
>
> All keys with prefix s3 or s3p (and fs.s3, fs.s3p) are routed towards
> presto [1].
>
> So it should be
> s3.access-key: XXX
> s3.secret-key: XXX
> s3.endpoint: http://aws:4566
> s3.path-style-access: true
> s3.path.style.access: true (only one of them is needed, but I don't know
> which, so please try out)
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#configure-access-credentials
>
> On Thu, Jan 28, 2021 at 4:58 PM Marco Villalobos <
> mvillalo...@kineteque.com> wrote:
>
>> Hi,
>>
>> I got s3a working on localstack. The missing piece of information from
>> Flink documentation seems to be that the system requires a HADOOP_HOME and
>> core-site.xml.
>>
>> Flink documentation states that s3p (presto) should be used for file
>> checkpointing into s3. I am using RocksDB, which I assume also means that I
>> should use s3p (the documentation was not specific about that).  Is that
>> assumption correct?
>>
>> However, I cannot get s3p working now.
>>
>> I did the following so far:
>>
>> I created the s3-fs-presto plugin directory and copied the jar from the
>> opt directory there.
>> I am not sure where to put the configuration keys though.  The
>> documentation states that I can just put in my flink-conf.yaml, but I had
>> no success.
>>
>> Where do I put the presto configuration keys? Are there any other missing
>> steps? Is this something that would only work on an EMR environment with a
>> real HIVE running?
>>
>> # The S3 storage endpoint server. This can be used to connect to an
>> S3-compatible storage
>> # system instead of AWS. When using v4 signatures, it is recommended to
>> set this to the
>> # AWS region-specific endpoint (e.g., http[s]://.s3-.
>> amazonaws.com).
>> hive.s3.endpoint: http://aws:4566
>>
>> # Use HTTPS to communicate with the S3 API (defaults to true).
>> hive.s3.ssl.enabled: false
>>
>> # Use path-style access for all requests to the S3-compatible storage.
>> # This is for S3-compatible storage that doesn’t support
>> virtual-hosted-style access. (defaults to false)
>> hive.s3.path-style-access: true
>>
>> But that also did not work.
>>
>> Any advice would be appreciated.
>>
>> -Marco Villalobos
>>
>


Very slow recovery from Savepoint

2021-01-28 Thread Yordan Pavlov
Hello there,
I am trying to find the solution for a problem we are having in our Flink
setup related to very slow recovery from a Savepoint. I have searched in the
mailing list, found a somewhat similar problem, the bottleneck there was the
HD usage, but I am not seeing this in our case. Here is a description of
what our setup is:
* Flink 1.11.3
* Running on top of Kubernetes on dedicated hardware.
* The Flink job consists of 4 task manager running on separate Kubernetes
pods along with a Jobmanager also running on separate Pod.
* We use RocksDB state backend with incremental checkpointing.
* The size of the savepoint I try to recover is around 35 GB
* The file system that RocksDB uses is S3, or more precisely a S3
emulation (Minio), we are not subject to any EBS burst credits and so
on.

The time it takes for the Flink job to be operational and start consuming
new records is around 5 hours. During that time I am not seeing any heavy
resource usage on any of the TaskManager pods. I am attaching a
screenshot of the resources of one of the Taskmanager pods.


In this graph the job was started at around 14:00 o'clock. There is this
huge spike shortly after this and then there is not much happening. This
goes on for around 5 hours after which the job starts, but again working
quite slowly. What would be the way to profile where the bottleneck
is? I have checked my network connectivity and I am able to download
the whole savepoint for several minutes manually. It seems like Flink
is very slow to build its internal state but then again the CPU is not
being utilized. I would be grateful for any suggestions on how to
proceed with this investigation.

Regards,
Yordan


Connect to schema registry via SSL

2021-01-28 Thread Laurent Exsteens
Hello,

I'm trying to us Flink SQL (on Ververica Platform, so no other options than
pure Flink SQL) to read confluent avro messages from Kafka, when the schema
registry secured via SSL.

Would you know what are the correct properties to setup in the kafka
consumer config?

The following options work for a simple java kafka producer/consumer (not a
Flink job):
- schema.registry.ssl.truststore.location
- schema.registry.ssl.truststore.password
- schema.registry.ssl.keystore.location
- schema.registry.ssl.keystore.password

However, they don't seem to be taken into account in my query (and also not
when I tried in a Flink job), even when prefixed by 'properties.'.

I'm using Flink 1.11 for the SQL query (Ververica Platform 2.3), and Flink
1.10 on my job.

Would you have an idea how can I tell my Flink SQL Kafka Connector how to
connect to that SR via SSL? Or a normal Flink job?

Thanks in advance for your help.

Best Regards,

Laurent.


-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu *

*research.euranova.eu* 

-- 
♻ Be green, keep it on the screen


Re: presto s3p checkpoints and local stack

2021-01-28 Thread Arvid Heise
Hi Marco,

afaik you don't need HADOOP_HOME or core-site.xml.

I'm also not sure from where you got your config keys. (I guess from the
Presto page, which probably all work if you remove hive., maybe we should
also support that)

All keys with prefix s3 or s3p (and fs.s3, fs.s3p) are routed towards
presto [1].

So it should be
s3.access-key: XXX
s3.secret-key: XXX
s3.endpoint: http://aws:4566
s3.path-style-access: true
s3.path.style.access: true (only one of them is needed, but I don't know
which, so please try out)

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#configure-access-credentials

On Thu, Jan 28, 2021 at 4:58 PM Marco Villalobos 
wrote:

> Hi,
>
> I got s3a working on localstack. The missing piece of information from
> Flink documentation seems to be that the system requires a HADOOP_HOME and
> core-site.xml.
>
> Flink documentation states that s3p (presto) should be used for file
> checkpointing into s3. I am using RocksDB, which I assume also means that I
> should use s3p (the documentation was not specific about that).  Is that
> assumption correct?
>
> However, I cannot get s3p working now.
>
> I did the following so far:
>
> I created the s3-fs-presto plugin directory and copied the jar from the
> opt directory there.
> I am not sure where to put the configuration keys though.  The
> documentation states that I can just put in my flink-conf.yaml, but I had
> no success.
>
> Where do I put the presto configuration keys? Are there any other missing
> steps? Is this something that would only work on an EMR environment with a
> real HIVE running?
>
> # The S3 storage endpoint server. This can be used to connect to an
> S3-compatible storage
> # system instead of AWS. When using v4 signatures, it is recommended to
> set this to the
> # AWS region-specific endpoint (e.g., http[s]://.s3-.
> amazonaws.com).
> hive.s3.endpoint: http://aws:4566
>
> # Use HTTPS to communicate with the S3 API (defaults to true).
> hive.s3.ssl.enabled: false
>
> # Use path-style access for all requests to the S3-compatible storage.
> # This is for S3-compatible storage that doesn’t support
> virtual-hosted-style access. (defaults to false)
> hive.s3.path-style-access: true
>
> But that also did not work.
>
> Any advice would be appreciated.
>
> -Marco Villalobos
>


Re: What causes a buffer pool exception? How can I mitigate it?

2021-01-28 Thread Arvid Heise
>
> Regarding the try catch block


Sorry I meant the try catch in SensorMessageToSensorTimeSeriesFunction.

Also, just to be clear, does disabling restart make it easier for you to
> debug?
>
Yes the log will be quite small then. Currently, it's just repeating the
same things a couple of times.

Btw if you also have a second taskmanager, that log would be even more
interesting. So best to attach all logs (JM + TMs).


On Thu, Jan 28, 2021 at 4:24 PM Marco Villalobos 
wrote:

> Regarding the try catch block, it rethrows the exception.  Here is the
> code:
>
> catch (RuntimeException e) {
> logger.error("Error in timer.", e);
> throw e;
> }
>
> That would be okay, right?
>
> Also, just to be clear, does disabling restart make it easier for you to
> debug?
>
> On Thu, Jan 28, 2021 at 1:17 AM Arvid Heise  wrote:
>
>> Hi Marco,
>>
>> In general, sending a compressed log to ML is totally fine. You can
>> further minimize the log by disabling restarts.
>> I looked into the logs that you provided.
>>
>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>>  [] - Attempting to cancel task forward fill -> (Sink: tag
>>> db sink, Sink: back fill db sink, Sink: min max step db sink) (2/2)
>>> (8c1f256176fb89f112c27883350a02bc).
>>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>>[] - forward fill -> (Sink: tag db sink, Sink: back fill
>>> db sink, Sink: min max step db sink) (2/2)
>>> (8c1f256176fb89f112c27883350a02bc) switched from RUNNING to CANCELING.
>>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>>[] - Triggering cancellation of task code forward fill
>>> -> (Sink: tag db sink, Sink: back fill db sink, Sink: min max step db sink)
>>> (2/2) (8c1f256176fb89f112c27883350a02bc).
>>> 2021-01-26 04:37:43,282 ERROR
>>> xx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction []
>>> - Error in timer.
>>> java.lang.RuntimeException: Buffer pool is destroyed.
>>>
>>
>> I can see that my suspicion is most likely correct: It first tries to
>> cancel the task for some reason and then a later timer will show you the
>> respective error. I created the ticket to resolve the issue [1]. There may
>> also be an issue of swalled interruption exceptions, which we are looking
>> into in parallel.
>>
>> However, there is a reason why the task is canceling in the first place
>> and we need to find that. I recommend to not have a try-catch block around
>> *collector.collect* in *ForwardFillKeyedProcessFunction*. Just have it
>> around your user code but not around system calls. This may swallow the
>> real cause.
>>
>> Are you executing the code in IDE? You may be able to set some
>> breakpoints to quickly figure out what's going wrong (I can help then).
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21181
>>
>> On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise  wrote:
>>
>>> Hi Marco,
>>>
>>> could you share your full task manager and job manager log? We
>>> double-checked and saw that the buffer pool is only released on
>>> cancellation or shutdown.
>>>
>>> So I'm assuming that there is another issue (e.g., Kafka cluster not
>>> reachable) and there is a race condition while shutting down. It seems like
>>> the buffer pool exception is shadowing the actual cause then for yet
>>> unknown reasons (this is an issue on its own, but you should be able to see
>>> the actual issue in task manager log).
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos <
>>> mvillalo...@kineteque.com> wrote:
>>>
 Actually, the log I sent in my previous message, shows the only error
 that occurred before the buffer pool was destroyed. That
 intermittent warning:

 2021-01-26 04:14:33,140 WARN
  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
 Committing offsets to Kafka takes longer than the checkpoint interval.
 Skipping commit of previous offsets because newer complete checkpoint
 offsets are available. This does not compromise Flink's checkpoint
 integrity.
 2021-01-26 04:14:33,143 INFO
  org.apache.kafka.clients.FetchSessionHandler [] -
 [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
 request (sessionId=936633685, epoch=1) to node 2: {}.
 org.apache.kafka.common.errors.DisconnectException: null

 I know that probably doesn't help much. Sorry.

 On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise  wrote:

> Hi Marco,
>
> the network buffer pool is destroyed when the task manager is
> shutdown. Could you check if you have an error before that in your log?
>
> It seems like the timer is triggered at a point where it shouldn't.
> I'll check if there is a known issue that has been fixed in later 
> versions.
> Do you have the option to upgrade to 1.11.3?
>
> Best,
>
> Arvid
>
> 

Re: HybridMemorySegment index out of bounds exception

2021-01-28 Thread Timo Walther
FYI: Yuval and I scheduled a call to investigate this serialization 
issue remotely on Monday. If you have any idea by looking at the code 
beforehand, let us know.



On 28.01.21 16:57, Yuval Itzchakov wrote:

Hi Timo,

The code example I posted doesn't really match the code that is causing 
this issue. I tried to extend it a bit but couldn't make the 
reproduction work there.
I am no longer using the serialized strings, but registering the custom 
serializers with the runtime during bootstrap and overriding 
getTypeInference to provide the raw data type.


But again, I disabled the custom serializer for the test to make sure it 
is not the one causing the issues.


Regarding FLINK-20986 
, I'm not sure but I 
am no longer using the old type system so everything should pass through 
InternalTypeInfo and RawType. I don't see any type equality issues, and 
I see the same serializer being invoked for both serialization and 
deserialization.


On Thu, Jan 28, 2021 at 5:51 PM Timo Walther > wrote:


This is helpful information. So I guess the problem must be in the
flink-table module and not in flink-core. I will try to reserve some
time tomorrow to look into the code again. How did you express
RawType(Array[String])? Again with fully serialized type string?

Could it be related to
https://issues.apache.org/jira/browse/FLINK-20986
 ?

Regards,
Timo


On 28.01.21 16:30, Yuval Itzchakov wrote:
 > Hi Timo,
 >
 > I tried replacing it with an ordinary ARRAY DataType, which
 > doesn't reproduce the issue.
 > If I use a RawType(Array[String]), the problem still manifests, so I
 > assume it's not directly related to a Kryo serialization of the
specific
 > underlying type (io.circe.Json), but something in the way it
interacts
 > with BinaryRawValueData and writing out to the network buffer
behind the
 > scenes.
 >
 > On Thu, Jan 28, 2021 at 5:26 PM Timo Walther mailto:twal...@apache.org>
 > >> wrote:
 >
 >     Hi Yuval,
 >
 >     we should definitely find the root cause of this issue. It
helps if the
 >     exception happens frequently to nail down the problem.
 >
 >     Have you tried to replace the JSON object with a regular
String? If the
 >     exception is gone after this change. I believe it must be the
 >     serialization and not the network stack.
 >
 >     Regards,
 >     Timo
 >
 >
 >     On 28.01.21 10:29, Yuval Itzchakov wrote:
 >      > Hi,
 >      >
 >      > I previously wrote about a problem I believed was caused
by Kryo
 >      > serialization
 >      >
 >   
  (https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E 
 >   
  >

 >
 >      >
 >   
  
 >   
  >>),

 >
 >      > which I am no longer sure is the case.
 >      >
 >      > I have a job which involves a TableScan via a custom
source operator
 >      > which generates a DataStream[RowData], a UDF to parse out a
 >     String =>
 >      > io.circe.Json object (which internally flows as a
 >     RAW('io.circe.Json')
 >      > data-type), and then an AggregateFunction with a
java.util.List
 >      > accumulator which returns one of these objects and is used
in a
 >     tumbling
 >      > window as follows:
 >      >
 >      >      SELECT any_json_array_value(parse_array(resources)) as
 >     resources_sample
 >      >      FROM foo
 >      >      GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
 >      >
 >      > It generates the following physical plan:
 >      >
 >      > optimize result:
 >      > Sink(table=[catalog.default-db.foo],
fields=[resources_sample])
 >      > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$,
event_time,
  

presto s3p checkpoints and local stack

2021-01-28 Thread Marco Villalobos
Hi,

I got s3a working on localstack. The missing piece of information from
Flink documentation seems to be that the system requires a HADOOP_HOME and
core-site.xml.

Flink documentation states that s3p (presto) should be used for file
checkpointing into s3. I am using RocksDB, which I assume also means that I
should use s3p (the documentation was not specific about that).  Is that
assumption correct?

However, I cannot get s3p working now.

I did the following so far:

I created the s3-fs-presto plugin directory and copied the jar from the opt
directory there.
I am not sure where to put the configuration keys though.  The
documentation states that I can just put in my flink-conf.yaml, but I had
no success.

Where do I put the presto configuration keys? Are there any other missing
steps? Is this something that would only work on an EMR environment with a
real HIVE running?

# The S3 storage endpoint server. This can be used to connect to an
S3-compatible storage
# system instead of AWS. When using v4 signatures, it is recommended to set
this to the
# AWS region-specific endpoint (e.g., http[s]://.s3-.
amazonaws.com).
hive.s3.endpoint: http://aws:4566

# Use HTTPS to communicate with the S3 API (defaults to true).
hive.s3.ssl.enabled: false

# Use path-style access for all requests to the S3-compatible storage.
# This is for S3-compatible storage that doesn’t support
virtual-hosted-style access. (defaults to false)
hive.s3.path-style-access: true

But that also did not work.

Any advice would be appreciated.

-Marco Villalobos


Re: HybridMemorySegment index out of bounds exception

2021-01-28 Thread Yuval Itzchakov
Hi Timo,

The code example I posted doesn't really match the code that is causing
this issue. I tried to extend it a bit but couldn't make the reproduction
work there.
I am no longer using the serialized strings, but registering the custom
serializers with the runtime during bootstrap and overriding
getTypeInference to provide the raw data type.

But again, I disabled the custom serializer for the test to make sure it is
not the one causing the issues.

Regarding FLINK-20986 ,
I'm not sure but I am no longer using the old type system so everything
should pass through InternalTypeInfo and RawType. I don't see any type
equality issues, and I see the same serializer being invoked for both
serialization and deserialization.

On Thu, Jan 28, 2021 at 5:51 PM Timo Walther  wrote:

> This is helpful information. So I guess the problem must be in the
> flink-table module and not in flink-core. I will try to reserve some
> time tomorrow to look into the code again. How did you express
> RawType(Array[String])? Again with fully serialized type string?
>
> Could it be related to https://issues.apache.org/jira/browse/FLINK-20986 ?
>
> Regards,
> Timo
>
>
> On 28.01.21 16:30, Yuval Itzchakov wrote:
> > Hi Timo,
> >
> > I tried replacing it with an ordinary ARRAY DataType, which
> > doesn't reproduce the issue.
> > If I use a RawType(Array[String]), the problem still manifests, so I
> > assume it's not directly related to a Kryo serialization of the specific
> > underlying type (io.circe.Json), but something in the way it interacts
> > with BinaryRawValueData and writing out to the network buffer behind the
> > scenes.
> >
> > On Thu, Jan 28, 2021 at 5:26 PM Timo Walther  > > wrote:
> >
> > Hi Yuval,
> >
> > we should definitely find the root cause of this issue. It helps if
> the
> > exception happens frequently to nail down the problem.
> >
> > Have you tried to replace the JSON object with a regular String? If
> the
> > exception is gone after this change. I believe it must be the
> > serialization and not the network stack.
> >
> > Regards,
> > Timo
> >
> >
> > On 28.01.21 10:29, Yuval Itzchakov wrote:
> >  > Hi,
> >  >
> >  > I previously wrote about a problem I believed was caused by Kryo
> >  > serialization
> >  >
> > (
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> > <
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> >
> >
> >  >
> > <
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> > <
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> >>),
> >
> >  > which I am no longer sure is the case.
> >  >
> >  > I have a job which involves a TableScan via a custom source
> operator
> >  > which generates a DataStream[RowData], a UDF to parse out a
> > String =>
> >  > io.circe.Json object (which internally flows as a
> > RAW('io.circe.Json')
> >  > data-type), and then an AggregateFunction with a java.util.List
> >  > accumulator which returns one of these objects and is used in a
> > tumbling
> >  > window as follows:
> >  >
> >  >  SELECT any_json_array_value(parse_array(resources)) as
> > resources_sample
> >  >  FROM foo
> >  >  GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
> >  >
> >  > It generates the following physical plan:
> >  >
> >  > optimize result:
> >  > Sink(table=[catalog.default-db.foo], fields=[resources_sample])
> >  > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$,
> event_time,
> >  > 360)], select=[any_json_array_value($f1) AS resources_sample])
> >  > +- Exchange(distribution=[single])
> >  >+- Calc(select=[event_time, parse_array(resources) AS $f1])
> >  >   +- WatermarkAssigner(rowtime=[event_time],
> > watermark=[event_time])
> >  >  +- TableSourceScan(table=[[catalog, default-db,
> foo]],
> >  > fields=[resources])
> >  >
> >  > When I run my job, I receive the following exception after 10 - 30
> >  > seconds (it varies, which gives me a hunch this is related to
> > some race
> >  > condition that might be happening):
> >  >
> >  > Caused by: java.io.IOException: Can't get next record for channel
> >  > InputChannelInfo{gateIdx=0, inputChannelIdx=0}
> >  > at
> >  > org.apache.flink.streaming.runtime.io
> >  >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
> >  > at
> >  > org.apache.flink.streaming.runtime.io
> >  

Re: HybridMemorySegment index out of bounds exception

2021-01-28 Thread Timo Walther
This is helpful information. So I guess the problem must be in the 
flink-table module and not in flink-core. I will try to reserve some 
time tomorrow to look into the code again. How did you express 
RawType(Array[String])? Again with fully serialized type string?


Could it be related to https://issues.apache.org/jira/browse/FLINK-20986 ?

Regards,
Timo


On 28.01.21 16:30, Yuval Itzchakov wrote:

Hi Timo,

I tried replacing it with an ordinary ARRAY DataType, which 
doesn't reproduce the issue.
If I use a RawType(Array[String]), the problem still manifests, so I 
assume it's not directly related to a Kryo serialization of the specific 
underlying type (io.circe.Json), but something in the way it interacts 
with BinaryRawValueData and writing out to the network buffer behind the 
scenes.


On Thu, Jan 28, 2021 at 5:26 PM Timo Walther > wrote:


Hi Yuval,

we should definitely find the root cause of this issue. It helps if the
exception happens frequently to nail down the problem.

Have you tried to replace the JSON object with a regular String? If the
exception is gone after this change. I believe it must be the
serialization and not the network stack.

Regards,
Timo


On 28.01.21 10:29, Yuval Itzchakov wrote:
 > Hi,
 >
 > I previously wrote about a problem I believed was caused by Kryo
 > serialization
 >

(https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E



 >

>),

 > which I am no longer sure is the case.
 >
 > I have a job which involves a TableScan via a custom source operator
 > which generates a DataStream[RowData], a UDF to parse out a
String =>
 > io.circe.Json object (which internally flows as a
RAW('io.circe.Json')
 > data-type), and then an AggregateFunction with a java.util.List
 > accumulator which returns one of these objects and is used in a
tumbling
 > window as follows:
 >
 >      SELECT any_json_array_value(parse_array(resources)) as
resources_sample
 >      FROM foo
 >      GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
 >
 > It generates the following physical plan:
 >
 > optimize result:
 > Sink(table=[catalog.default-db.foo], fields=[resources_sample])
 > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time,
 > 360)], select=[any_json_array_value($f1) AS resources_sample])
 >     +- Exchange(distribution=[single])
 >        +- Calc(select=[event_time, parse_array(resources) AS $f1])
 >           +- WatermarkAssigner(rowtime=[event_time],
watermark=[event_time])
 >              +- TableSourceScan(table=[[catalog, default-db, foo]],
 > fields=[resources])
 >
 > When I run my job, I receive the following exception after 10 - 30
 > seconds (it varies, which gives me a hunch this is related to
some race
 > condition that might be happening):
 >
 > Caused by: java.io.IOException: Can't get next record for channel
 > InputChannelInfo{gateIdx=0, inputChannelIdx=0}
 > at
 > org.apache.flink.streaming.runtime.io

.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
 > at
 > org.apache.flink.streaming.runtime.io

.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 > at
 >

org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
 > at
 >

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
 > at
 >

org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
 > at
 >

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
 > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
 > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
 > at java.lang.Thread.run(Thread.java:748)
 > Caused by: java.io.IOException: Serializer consumed more bytes
than the
 > record had. This indicates broken serialization. If you are using
custom
 > serialization types (Value or Writable), check their serialization
 > methods. If you are using a Kryo-serialized type, check the
 > corresponding Kryo serializer.
 > at
 > org.apache.flink.runtime.io

.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
 > at
 

Re: HybridMemorySegment index out of bounds exception

2021-01-28 Thread Yuval Itzchakov
Hi Timo,

I tried replacing it with an ordinary ARRAY DataType, which doesn't
reproduce the issue.
If I use a RawType(Array[String]), the problem still manifests, so I assume
it's not directly related to a Kryo serialization of the specific
underlying type (io.circe.Json), but something in the way it interacts with
BinaryRawValueData and writing out to the network buffer behind the scenes.

On Thu, Jan 28, 2021 at 5:26 PM Timo Walther  wrote:

> Hi Yuval,
>
> we should definitely find the root cause of this issue. It helps if the
> exception happens frequently to nail down the problem.
>
> Have you tried to replace the JSON object with a regular String? If the
> exception is gone after this change. I believe it must be the
> serialization and not the network stack.
>
> Regards,
> Timo
>
>
> On 28.01.21 10:29, Yuval Itzchakov wrote:
> > Hi,
> >
> > I previously wrote about a problem I believed was caused by Kryo
> > serialization
> > (
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> > <
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>),
>
> > which I am no longer sure is the case.
> >
> > I have a job which involves a TableScan via a custom source operator
> > which generates a DataStream[RowData], a UDF to parse out a String =>
> > io.circe.Json object (which internally flows as a RAW('io.circe.Json')
> > data-type), and then an AggregateFunction with a java.util.List
> > accumulator which returns one of these objects and is used in a tumbling
> > window as follows:
> >
> >  SELECT any_json_array_value(parse_array(resources)) as
> resources_sample
> >  FROM foo
> >  GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
> >
> > It generates the following physical plan:
> >
> > optimize result:
> > Sink(table=[catalog.default-db.foo], fields=[resources_sample])
> > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time,
> > 360)], select=[any_json_array_value($f1) AS resources_sample])
> > +- Exchange(distribution=[single])
> >+- Calc(select=[event_time, parse_array(resources) AS $f1])
> >   +- WatermarkAssigner(rowtime=[event_time],
> watermark=[event_time])
> >  +- TableSourceScan(table=[[catalog, default-db, foo]],
> > fields=[resources])
> >
> > When I run my job, I receive the following exception after 10 - 30
> > seconds (it varies, which gives me a hunch this is related to some race
> > condition that might be happening):
> >
> > Caused by: java.io.IOException: Can't get next record for channel
> > InputChannelInfo{gateIdx=0, inputChannelIdx=0}
> > at
> > org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
> > at
> > org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Serializer consumed more bytes than the
> > record had. This indicates broken serialization. If you are using custom
> > serialization types (Value or Writable), check their serialization
> > methods. If you are using a Kryo-serialized type, check the
> > corresponding Kryo serializer.
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
> > at
> > org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
> > ... 8 more
> > Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019,
> > length: 546153590, index: 43, offset: 0
> > at
> >
> org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
> > at
> > org.apache.flink.runtime.io
> .network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
> > at
> >
> 

Re: HybridMemorySegment index out of bounds exception

2021-01-28 Thread Timo Walther

Hi Yuval,

we should definitely find the root cause of this issue. It helps if the 
exception happens frequently to nail down the problem.


Have you tried to replace the JSON object with a regular String? If the 
exception is gone after this change. I believe it must be the 
serialization and not the network stack.


Regards,
Timo


On 28.01.21 10:29, Yuval Itzchakov wrote:

Hi,

I previously wrote about a problem I believed was caused by Kryo 
serialization 
(https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E 
), 
which I am no longer sure is the case.


I have a job which involves a TableScan via a custom source operator 
which generates a DataStream[RowData], a UDF to parse out a String => 
io.circe.Json object (which internally flows as a RAW('io.circe.Json') 
data-type), and then an AggregateFunction with a java.util.List 
accumulator which returns one of these objects and is used in a tumbling 
window as follows:


     SELECT any_json_array_value(parse_array(resources)) as resources_sample
     FROM foo
     GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)

It generates the following physical plan:

optimize result:
Sink(table=[catalog.default-db.foo], fields=[resources_sample])
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time, 
360)], select=[any_json_array_value($f1) AS resources_sample])

    +- Exchange(distribution=[single])
       +- Calc(select=[event_time, parse_array(resources) AS $f1])
          +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time])
             +- TableSourceScan(table=[[catalog, default-db, foo]], 
fields=[resources])


When I run my job, I receive the following exception after 10 - 30 
seconds (it varies, which gives me a hunch this is related to some race 
condition that might be happening):


Caused by: java.io.IOException: Can't get next record for channel 
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Serializer consumed more bytes than the 
record had. This indicates broken serialization. If you are using custom 
serialization types (Value or Writable), check their serialization 
methods. If you are using a Kryo-serialized type, check the 
corresponding Kryo serializer.
at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)

... 8 more
Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019, 
length: 546153590, index: 43, offset: 0
at 
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at 
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)

... 11 more
Caused 

Re: What causes a buffer pool exception? How can I mitigate it?

2021-01-28 Thread Marco Villalobos
Regarding the try catch block, it rethrows the exception.  Here is the code:

catch (RuntimeException e) {
logger.error("Error in timer.", e);
throw e;
}

That would be okay, right?

Also, just to be clear, does disabling restart make it easier for you to
debug?

On Thu, Jan 28, 2021 at 1:17 AM Arvid Heise  wrote:

> Hi Marco,
>
> In general, sending a compressed log to ML is totally fine. You can
> further minimize the log by disabling restarts.
> I looked into the logs that you provided.
>
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>  [] - Attempting to cancel task forward fill -> (Sink: tag
>> db sink, Sink: back fill db sink, Sink: min max step db sink) (2/2)
>> (8c1f256176fb89f112c27883350a02bc).
>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>  [] - forward fill -> (Sink: tag db sink, Sink: back fill
>> db sink, Sink: min max step db sink) (2/2)
>> (8c1f256176fb89f112c27883350a02bc) switched from RUNNING to CANCELING.
>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>  [] - Triggering cancellation of task code forward fill ->
>> (Sink: tag db sink, Sink: back fill db sink, Sink: min max step db sink)
>> (2/2) (8c1f256176fb89f112c27883350a02bc).
>> 2021-01-26 04:37:43,282 ERROR
>> xx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction []
>> - Error in timer.
>> java.lang.RuntimeException: Buffer pool is destroyed.
>>
>
> I can see that my suspicion is most likely correct: It first tries to
> cancel the task for some reason and then a later timer will show you the
> respective error. I created the ticket to resolve the issue [1]. There may
> also be an issue of swalled interruption exceptions, which we are looking
> into in parallel.
>
> However, there is a reason why the task is canceling in the first place
> and we need to find that. I recommend to not have a try-catch block around
> *collector.collect* in *ForwardFillKeyedProcessFunction*. Just have it
> around your user code but not around system calls. This may swallow the
> real cause.
>
> Are you executing the code in IDE? You may be able to set some breakpoints
> to quickly figure out what's going wrong (I can help then).
>
> [1] https://issues.apache.org/jira/browse/FLINK-21181
>
> On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise  wrote:
>
>> Hi Marco,
>>
>> could you share your full task manager and job manager log? We
>> double-checked and saw that the buffer pool is only released on
>> cancellation or shutdown.
>>
>> So I'm assuming that there is another issue (e.g., Kafka cluster not
>> reachable) and there is a race condition while shutting down. It seems like
>> the buffer pool exception is shadowing the actual cause then for yet
>> unknown reasons (this is an issue on its own, but you should be able to see
>> the actual issue in task manager log).
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>>
>>> Actually, the log I sent in my previous message, shows the only error
>>> that occurred before the buffer pool was destroyed. That
>>> intermittent warning:
>>>
>>> 2021-01-26 04:14:33,140 WARN
>>>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
>>> Committing offsets to Kafka takes longer than the checkpoint interval.
>>> Skipping commit of previous offsets because newer complete checkpoint
>>> offsets are available. This does not compromise Flink's checkpoint
>>> integrity.
>>> 2021-01-26 04:14:33,143 INFO
>>>  org.apache.kafka.clients.FetchSessionHandler [] -
>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
>>> request (sessionId=936633685, epoch=1) to node 2: {}.
>>> org.apache.kafka.common.errors.DisconnectException: null
>>>
>>> I know that probably doesn't help much. Sorry.
>>>
>>> On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise  wrote:
>>>
 Hi Marco,

 the network buffer pool is destroyed when the task manager is shutdown.
 Could you check if you have an error before that in your log?

 It seems like the timer is triggered at a point where it shouldn't.
 I'll check if there is a known issue that has been fixed in later versions.
 Do you have the option to upgrade to 1.11.3?

 Best,

 Arvid

 On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <
 mvillalo...@kineteque.com> wrote:

> Hi.  What causes a buffer pool exception? How can I mitigate it? It is
> causing us plenty of problems right now.
>
> 2021-01-26 04:14:33,041 INFO
>  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
> Subtask 1 received completion notification for checkpoint with id=4.
> 2021-01-26 04:14:33,140 WARN
>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
> Committing offsets to Kafka takes longer than the checkpoint interval.
> 

Question

2021-01-28 Thread Abu Bakar Siddiqur Rahman Rocky
Hi,

Is there any library to use and remember the apache flink snapshot?

Thank you

--
Regards,
Abu Bakar Siddiqur Rahman


Re: key group from xx to yy does not contain zz异常

2021-01-28 Thread restart
感谢老师解答,keyBy的执行逻辑看来我理解的太肤浅了。随机数生成逻辑在keyBy前通过map赋值到具体字段,保证后续keyby时稳定,应该就对了。再次感谢老师指点迷津。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Timers not firing until stream end

2021-01-28 Thread Chesnay Schepler

I'm not sure I see the problem in your output.

For any given key the timestamps are in order, and the events where 
devices are offline seem to occur at the right time.


Is it just that you'd like the following line to occur earlier in the 
output?


{"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}

If so, then I'd just partition the output by key and evaluate them 
individually.


On 1/28/2021 9:53 AM, Pilgrim Beart wrote:
Scratch that - your WatermarkStrategy DOES work (when I implement it 
correctly!).
Well, almost: As you can see below (code pushed to repo), the Timer 
events are still appearing somewhat late in the stream - 4 events late 
in this case. It may be just good-enough for my purposes, though it 
will make building test cases painful, so if you have any ideas how I 
could fix that, would be much appreciated.


{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelled previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelled previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelled previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelled previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelled previous timer. "}

{"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelled previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelled previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelled previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 5000 Cancelled previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 5000 Cancelled previous timer. "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelled previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelled previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelled previous timer. "}

{"ts":5000,"id":"1","is_online":false,"log":"timestamp is 5000"}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 7000 Cancelled previous timer. "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelled previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelled previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelled previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelled previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 
8000 msg_in.ts 9000 Cancelled previous timer. "}
{"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 
8000 msg_in.ts 9000 Cancelled previous timer. "}
{"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 
8000 msg_in.ts 9000 Cancelled previous timer. "}
{"ts":1,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 
9000 msg_in.ts 1 Cancelled previous timer. "}
{"ts":1,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 
9000 msg_in.ts 1 Cancelled previous timer. "}
{"ts":1,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 
9000 msg_in.ts 1 Cancelled previous timer. "}

{"ts":11000,"id":"1","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"2","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"0","is_online":false,"log":"timestamp is 11000"}
-Pilgrim
--
Learn more at https://devicepilot.com  
@devicepilot 
 
 +44 7961 125282
See our latest features 
 
and book me 

使用DataStream API + HBaseRowInputFormat读取HBase时表时,Row arity of from (2) does not match this serializers field length (1)

2021-01-28 Thread automths
Hi:
您好,我在使用DataStream API 
读取HBase表时,使用了HBaseRowInputFormat,并根据HBaseTableSchema了schema,代码如下:


val env = StreamExecutionEnvironment.getExecutionEnvironment
val hbaseTableSchema = TableSchema.builder()
  .add(TableColumn.of("id", DataTypes.STRING()))
  .add(TableColumn.of("f1", DataTypes.ROW(DataTypes.FIELD("value", 
DataTypes.STRING()
  .build()
val schema = HBaseTableSchema.fromTableSchema(hbaseTableSchema)


val ds: DataStream[Row] = env.createInput(new HBaseRowInputFormat(
  hbaseConfig(),
  tabelName,
  schema
))
ds.print()
env.execute(this.getClass.getSimpleName)
运行时报了如下错误:

 java.lang.RuntimeException: Row arity of from (2) does not match this 
serializers field length (1).
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:113)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:58)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)




找到了原因是HBaseRowInputFormat源码中:
@Override
public TypeInformation getProducedType() {
// split the fieldNames
String[] famNames = schema.getFamilyNames();
TypeInformation[] typeInfos = new TypeInformation[famNames.length];
int i = 0;
for (String family : famNames) {
typeInfos[i] = new RowTypeInfo(
schema.getQualifierTypes(family),
schema.getQualifierNames(family));
i++;
}
return new RowTypeInfo(typeInfos, famNames);
}
此处在构建TypeInformation时,没有加入rowkey的类型


所以这是一个bug吗?




祝好!
automths





为什么window的reduceFunction不支持RichFunction呢

2021-01-28 Thread 赵一旦
问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。
但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。


Cannot access state from a empty taskmanager - using kubernetes

2021-01-28 Thread Daniel Peled
Hi,

We have followed the instructions in the following link ""Enabling
Queryable State" with kubernetes:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html#enabling-queryable-state



*When the replicas of the task-manager pods is 1 we get NO errorBut when
the replicas is greater than 1 for example 7 we get the following error
when trying to access flink state:We think it might be related to jira
issue *FLINK-10225  *that
has been abandoned*

java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed
request 1.
 Caused by:
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
not retrieve location of state=stopJobValueState of
job=d5d14923157f5c3d3c4b2e1b7c02a942. Potential reasons are: i) the state
is not ready, or ii) the job does not exist.
 Caused by:
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
not retrieve location of state=stopJobValueState of
job=d5d14923157f5c3d3c4b2e1b7c02a942. Potential reasons are: i) the state
is not ready, or ii) the job does not exist.

at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:247)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:164)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:131)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:121)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:258)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)


BR,
Danny


Re: key group from xx to yy does not contain zz异常

2021-01-28 Thread Yun Tang
Hi,

原因是你的key selector引入了随机变量 (也就是下面的方法keyBy),导致其select出来的key不是固定的

public KeySelector keyBy(int parallelism) {
return value -> 
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(), 
ThreadLocalRandom.current().nextInt(parallelism));
}

例如原先的key selector选出的key是 key-A,经过取模得到的key group是44,理应将该record发送给下游key 
group包含44的task,但是相关record进入到对应group的task之后,在加入到timer队列的时候,还会再次进行group的计算,由于你的key
 selector有随机性,导致这次选出的key可能是key-B,而针对key-B的取模运算得到的key group是4,也就有可能不在你的task (key 
group 44-45) 中了,导致了最终的异常。

祝好
唐云

From: restart 
Sent: Thursday, January 28, 2021 17:54
To: user-zh@flink.apache.org 
Subject: key group from xx to yy does not contain zz异常

线上部署flink项目时,启动爆如下错误,在测试环境可正常启动,job依赖的flink版本是1.10,flink
集群版本是1.12-SNAPSHOT,与线上一致。有点摸不着头脑,麻烦各位老师帮分析分析
堆栈信息:
java.lang.IllegalArgumentException: key group from 44 to 45 does not contain
4
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:187)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:182)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:176)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:112)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:217)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:884)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:898)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:399)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:567)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)

代码逻辑大致:
DataStream stream = dataStream
.keyBy(keyBy(globalParallelism))
.window(window(downsampling))
.reduce(reduce(trackerType), processWindow(trackerType),
TypeInformation.of(Metrics.class))
.keyBy(secondKeyBy())

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.reduce(reduce(trackerType),
processSecondWindow(trackerType), TypeInformation.of(Metrics.class))
.rebalance()
.addSink(sink())
.setParallelism(globalParallelism/2);

public KeySelector keyBy(int parallelism) {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),ThreadLocalRandom.current().nextInt(parallelism));
}

public KeySelector secondKeyBy() {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),
value.getWindowEnd());
}
备注:二次keyby的原因是为了解决数据倾斜问题,第一个keyby用来基于EventTime的翻滚窗口,第二个keyby使用了基于processTime的session窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink-1.12 通过-t指定模式后无法指定yarn参数

2021-01-28 Thread Yapor
好的 感谢!
在 2021-01-28 15:52:36,"silence"  写道:
>flink1.12后所有的yarn相关的参数通过-D进行指定
>例:-D yarn.application.name=xxx 替代以前的-ynm xxx
>更多配置参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Problem restirng state

2021-01-28 Thread Shridhar Kulkarni
All,

We are getting the exception, copied at the end of this post. The exception
is thrown when a new flink job is submitted; when Flink tries to restore
the previous state.

Environment:
Flink version: 1.10.1
State persistence: Hadoop 3.3
Zookeeper 3.5.8
Parallelism: 4

The code implements DataStream Transformation functions: ProcessFunction ->
KeySelector -> ProcessFunction
Inbound messages are partitioned by key "sourceId" which is a part of the
exception stack trace. SourceId is String type and is unique.
---
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
Serialization trace:
sourceId (com.contineo.ext.flink.core.ThingState)
---

We have overridden
"org.apache.flink.streaming.api.functions.ProcessFunction.open()" method
Any help is appreciated


Exception stack trace:

2021-01-19 19:59:56,934 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
triggering task Source: Custom Source -> Process -> Process (3/4) of job
c957f40043721b5cab3161991999a7ed is not in state RUNNING but DEPLOYING
instead. Aborting checkpoint.
2021-01-19 19:59:57,358 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Process ->
Sink: Unnamed (4/4) (b2605627c2fffc83dd412b3e7565244d) switched from
RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for
LegacyKeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(4/4) from any
of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
Serialization trace:
sourceId (com.contineo.ext.flink.core.ThingState)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more
Caused by: 

Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 Thread LakeShen
如果是窗口类聚合,可以尝试一下自定义窗口 Trigger

Best,
LakeShen

林影  于2021年1月28日周四 下午5:46写道:

> Hi, Jessica.J.Wang
> 开源flink看起来没这个功能哈,文档翻了一遍没找到
>
> Jessica.J.Wang  于2021年1月28日周四 下午5:25写道:
>
> > 你使用的是什么窗口呢,是 Tumble或者Hop吗,如果没数据 但是想提前输出结果可以用 emit
> >
> >
> https://help.aliyun.com/document_detail/98951.html?spm=5176.11065259.1996646101.searchclickresult.513d1037M5VADa
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-28 Thread Sebastián Magrí
Applied that parameter and that seems to get me some progress here.

I still get the shade overlapping classes warning, but I get the
PostgreSQLTableFactory in the merged table.factories.Factory service file.

However, now on runtime the application fails to find the debezium source
function class coming down to this error:

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class:
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.

The class is indeed in jar, though.

Any thougths?

On Thu, 28 Jan 2021 at 09:57, Jark Wu  wrote:

> Hi Sebastián,
>
> Could you try to add combine.children="append" attribute to the
> transformers configuration?
> You can also see the full shade plugin configuration here [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/#transform-table-connectorformat-resources
>
> On Thu, 28 Jan 2021 at 17:28, Sebastián Magrí 
> wrote:
>
>> Hi Jark!
>>
>> Please find the full pom file attached.
>>
>> Best Regards,
>>
>> On Thu, 28 Jan 2021 at 03:21, Jark Wu  wrote:
>>
>>> Hi Sebastián,
>>>
>>> I think Dawid is right.
>>>
>>> Could you share the pom file? I also tried to
>>> package flink-connector-postgres-cdc with ServicesResourceTransformer, and
>>> the Factory file contains
>>>
>>> com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory
>>>
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí 
>>> wrote:
>>>
 Thanks a lot for looking into it Dawid,

 In the
 src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 file I only see

 org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory

 Even after applying the ServicesResourceTransformer.


 On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz 
 wrote:

> Hi,
>
> Unfortunately I am not familiar with the packaging of
> flink-connector-postgres-cdc. Maybe @Jark could help here?
>
> However, I think the problem that you cannot find the connector is
> caused because of lack of entry in the resulting Manifest file. If there
> are overlapping classes maven does not exclude whole dependencies, but
> rather picks the overlapping class from one of the two. Could you check if
> you see entries for all tables in
> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
>
> If not, you could try applying the ServicesResourceTransformer[1]
>
> Best,
>
> Dawid
>
> [1]
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> On 26/01/2021 12:29, Sebastián Magrí wrote:
>
> Hi!
>
> I've reported an issue with the postgresql-cdc connector apparently
> caused by the maven shade plugin excluding either the JDBC connector or 
> the
> cdc connector due to overlapping classes. The issue for reference is here:
>
> https://github.com/ververica/flink-cdc-connectors/issues/90
>
> In the meantime, however, I've been trying to figure out if I can set
> up an exclusion rule to fix this in my pom.xml file, without success.
>
> The `org.postgresql:postgresql` dependency is being added manually by
> me to have a sink on a postgresql table and injected by the cdc connector
> seemingly via its debezium connector dependency.
>
> Any guidance or hints I could follow would be really appreciated.
>
> --
> Sebastián Ramírez Magrí
>
>

 --
 Sebastián Ramírez Magrí

>>>
>>
>> --
>> Sebastián Ramírez Magrí
>>
>

-- 
Sebastián Ramírez Magrí


关于flink-shaded-xxx的问题

2021-01-28 Thread 赵一旦
如题,我想知道flink shade了多个包,比如jackson,guava等。
其目的是(1)flink用到这些,为了避免冲突所以shade。还是(2)flink推荐用户直接使用flink shade好的这些包?

如上,我想知道是否“推荐”用户直接使用flink
shade的这些包。还是我们自己去依赖自己的包,比如我当前就用到了jackson,以及guava(我直接用了最新的30-jre的版本)。


Apache Flink Job Manager High CPU with Couchbase

2021-01-28 Thread VINAYA KUMAR BENDI
Hello,
We work in a multinational company that produces diesel engines and is working 
on an IoT platform to analyze engine performance based on sensor data. We are 
using Flink for deploying analytics stream processing jobs. We recently 
integrated these jobs with Couchbase (serving as a Cache) and are monitoring 
the performance of these jobs in our test environment.
Flink Cluster
Two Job Managers (2 cpu, 8 GB, Centos 7, 50 GB Disk, Apache Flink 1.9.0)
Six Task Managers (4 cpu, 16 GB, Centos 7, 50 GB Disk, Apache Flink 1.9.0)
Couchbase Cluster
Two nodes (4 cpu, 16 GB, Amazon Linux 2, 25 GB Disk, Community Edition 6.5.1 
build 6299)
Couchbase SDK
java-client (3.0.10)
We noticed that after re-deploying our analytics Flink jobs, high CPU usage 
alert is seen on Flink Job Manager.
  PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND
1831 centos20   0   11.3g   5.8g   6076 S 162.5 77.3  17181:30 java
It was observed that the Job Manager process (1831) comprised of 60.9% (873 out 
of 1433) threads related to couchbase timers of some sort (e.g. cb-timer-1-1, 
cb-events, cb-tracing-1, cb-orphan-1 etc.). This may be the reason for high CPU 
usage.
$ ps -eT | grep 1831 | grep -i cb | wc -l
873

$ ps -eT | grep 1831 | wc -l
1433

$ ps -eT | grep 1831 | grep -i cb
1831 11539 ?01:45:52 cb-timer-1-1
1831 11541 ?00:11:53 cb-events
1831 11542 ?00:10:13 cb-tracing-1
1831 11543 ?00:10:08 cb-orphan-1
1831 11545 ?00:04:47 cb-comp-1
1831 11546 ?00:06:41 cb-comp-2
1831 11547 ?00:34:57 cb-io-kv-5-1
1831 11549 ?00:17:48 cb-io-kv-5-2
1831 24911 ?00:43:07 cb-timer-1-1
1831 24912 ?00:05:34 cb-events
1831 24913 ?00:04:02 cb-tracing-1
1831 24914 ?00:04:02 cb-orphan-1
1831 24915 ?00:02:35 cb-comp-1
1831 24916 ?00:04:17 cb-comp-2
1831 24917 ?00:23:41 cb-io-kv-5-1
1831 24919 ?00:13:08 cb-io-kv-5-2
1831 24966 ?00:44:37 cb-timer-1-1
1831 24967 ?00:05:43 cb-events
...
If there is any issue in connecting to Couchbase from the Flink jobs running on 
Flink task managers then those may result in errors being logged in task 
manager or cause some other issue (e.g. CPU) in task managers. However, I am 
wondering why high CPU is seen on Flink Job Manager instead and not on Flink 
task managers. Please help in understanding possible reasons for creation of 
numerous cb-* threads on Flink Job Manager. If you have any other suggestions 
or commands to better troubleshoot this issue, those are welcome too.
Thank you.
Kind regards,
Vinaya



Re: Flink1.12 批处理模式,分词统计时单词个数为1的单词不会被打印

2021-01-28 Thread Xintong Song
你用的应该是 1.12.0 版本吧。这是一个已知问题 [1],升级到 1.12.1 有修复。

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-20764

On Thu, Jan 28, 2021 at 4:55 PM xhyan0427 <15527609...@163.com> wrote:

> 代码:
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setRuntimeMode(RuntimeExecutionMode.BATCH)  // 在DataStream
> API上以批处理方式执行
>
> // 本地测试文件
> val inputStream =
> env.readTextFile(getClass.getResource("/hello.txt").getPath)
>
> // 分词统计,问题:批处理模式的时候,sum 为 1 的单词不会被打印
> val resultStream = inputStream
>   .flatMap(_.split(","))
>   .filter(_.nonEmpty)
>   .map((_, 1))
>   .keyBy(_._1)
>   .sum(1)
> resultStream.print()
> env.execute("word count")
>
> 测试文件的数据内容:
> hello,flink
> hello,flink
> hello,hive
> hello,hive
> hello,hbase
> hello,hbase
> hello,scala
> hello,kafka
> hello,kafka
>
>
> 测试结果:hello/flink/hive/hbase/kafka的和大于1,会打印出来;但是 scala的个数为1,不会被打印出来
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


key group from xx to yy does not contain zz异常

2021-01-28 Thread restart
线上部署flink项目时,启动爆如下错误,在测试环境可正常启动,job依赖的flink版本是1.10,flink
集群版本是1.12-SNAPSHOT,与线上一致。有点摸不着头脑,麻烦各位老师帮分析分析
堆栈信息:
java.lang.IllegalArgumentException: key group from 44 to 45 does not contain
4
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:187)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:182)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:176)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:112)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:217)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:884)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:898)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:399)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:567)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)

代码逻辑大致:
DataStream stream = dataStream
.keyBy(keyBy(globalParallelism))
.window(window(downsampling))
.reduce(reduce(trackerType), processWindow(trackerType),
TypeInformation.of(Metrics.class))
.keyBy(secondKeyBy())
   
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.reduce(reduce(trackerType),
processSecondWindow(trackerType), TypeInformation.of(Metrics.class))
.rebalance()
.addSink(sink())
.setParallelism(globalParallelism/2);

public KeySelector keyBy(int parallelism) {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),ThreadLocalRandom.current().nextInt(parallelism));
}

public KeySelector secondKeyBy() {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),
value.getWindowEnd());
}
备注:二次keyby的原因是为了解决数据倾斜问题,第一个keyby用来基于EventTime的翻滚窗口,第二个keyby使用了基于processTime的session窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: 关于端到端的延迟监控

2021-01-28 Thread 13051111332


谢谢大家,清楚了


| |
1305332
|
|
1305...@163.com
|
签名由网易邮箱大师定制


在2021年01月28日 17:56,Jessica.J.Wang 写道:
官方的LatencyMarker 表示的是数据的流通性, 他和数据是在同一个pipeline 中顺序处理的,如果你的算子都是同步的情况
是可以反应出数据的真实处理延迟,生产上是可以使用的,但是 延迟粒度 metrics.latency.granularity 最好调整成 single或者
operator ,防止latency上报太多 压垮服务


但当你的算子是个异步 用AsyncWaitOperator实现的话,因为latencyMarker并没有像watermark一样
addToWorkQueue,直接处理上报metrics,所以延迟信息就不准确了

所以自己做端到端延迟的话,可以flink sql source 层 抽取其event time时间往下游发送,insert into sink的时候
,写一个udf (currenttime-eventime) 计算其延迟时间,写到外部数据库中,sink最好是influxdb之类的,方便统计






--
Sent from: http://apache-flink.147419.n8.nabble.com/

检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-01-28 Thread 赵一旦
如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
报错堆栈如下,关键错误是什么无法访问public修饰的成员?

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:235)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:400)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$2(StreamTask.java:507)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:501)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5/
30) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.keyedStatedBackend(
StreamTaskStateInitializerImpl.java:316)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:155)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.filesystem.FsStateBackend
.createKeyedStateBackend(FsStateBackend.java:540)
at org.apache.flink.runtime.state.filesystem.FsStateBackend
.createKeyedStateBackend(FsStateBackend.java:100)
at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(
StateBackend.java:178)
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
StreamTaskStateInitializerImpl.java:299)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: com.google.common.hash.LongAdder
Serialization trace:
bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
bits (com.google.common.hash.BloomFilter)
at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
136)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:113)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.
KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(
KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation
.readKeyGroupStateData(HeapRestoreOperation.java:299)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation
.readStateHandleStateData(HeapRestoreOperation.java:260)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
HeapRestoreOperation.java:160)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder

Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-28 Thread Jark Wu
Hi Sebastián,

Could you try to add combine.children="append" attribute to the
transformers configuration?
You can also see the full shade plugin configuration here [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/#transform-table-connectorformat-resources

On Thu, 28 Jan 2021 at 17:28, Sebastián Magrí  wrote:

> Hi Jark!
>
> Please find the full pom file attached.
>
> Best Regards,
>
> On Thu, 28 Jan 2021 at 03:21, Jark Wu  wrote:
>
>> Hi Sebastián,
>>
>> I think Dawid is right.
>>
>> Could you share the pom file? I also tried to
>> package flink-connector-postgres-cdc with ServicesResourceTransformer, and
>> the Factory file contains
>>
>> com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory
>>
>>
>> Best,
>> Jark
>>
>>
>> On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí 
>> wrote:
>>
>>> Thanks a lot for looking into it Dawid,
>>>
>>> In the
>>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
>>> file I only see
>>>
>>> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
>>>
>>> Even after applying the ServicesResourceTransformer.
>>>
>>>
>>> On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz 
>>> wrote:
>>>
 Hi,

 Unfortunately I am not familiar with the packaging of
 flink-connector-postgres-cdc. Maybe @Jark could help here?

 However, I think the problem that you cannot find the connector is
 caused because of lack of entry in the resulting Manifest file. If there
 are overlapping classes maven does not exclude whole dependencies, but
 rather picks the overlapping class from one of the two. Could you check if
 you see entries for all tables in
 src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.

 If not, you could try applying the ServicesResourceTransformer[1]

 Best,

 Dawid

 [1]
 https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
 On 26/01/2021 12:29, Sebastián Magrí wrote:

 Hi!

 I've reported an issue with the postgresql-cdc connector apparently
 caused by the maven shade plugin excluding either the JDBC connector or the
 cdc connector due to overlapping classes. The issue for reference is here:

 https://github.com/ververica/flink-cdc-connectors/issues/90

 In the meantime, however, I've been trying to figure out if I can set
 up an exclusion rule to fix this in my pom.xml file, without success.

 The `org.postgresql:postgresql` dependency is being added manually by
 me to have a sink on a postgresql table and injected by the cdc connector
 seemingly via its debezium connector dependency.

 Any guidance or hints I could follow would be really appreciated.

 --
 Sebastián Ramírez Magrí


>>>
>>> --
>>> Sebastián Ramírez Magrí
>>>
>>
>
> --
> Sebastián Ramírez Magrí
>


Re: 关于端到端的延迟监控

2021-01-28 Thread Jessica.J.Wang
官方的LatencyMarker 表示的是数据的流通性, 他和数据是在同一个pipeline 中顺序处理的,如果你的算子都是同步的情况
是可以反应出数据的真实处理延迟,生产上是可以使用的,但是 延迟粒度 metrics.latency.granularity 最好调整成 single或者
operator ,防止latency上报太多 压垮服务


但当你的算子是个异步 用AsyncWaitOperator实现的话,因为latencyMarker并没有像watermark一样
addToWorkQueue,直接处理上报metrics,所以延迟信息就不准确了

所以自己做端到端延迟的话,可以flink sql source 层 抽取其event time时间往下游发送,insert into sink的时候
,写一个udf (currenttime-eventime) 计算其延迟时间,写到外部数据库中,sink最好是influxdb之类的,方便统计






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: JobManager seems to be leaking temporary jar files

2021-01-28 Thread Chesnay Schepler
Code-wise it appears that thing have gotten simpler and we can use use a 
URLClassLoader within PackagedProgram.


We probably won't get around a dedicated close() method on the 
PackagedProgram.


I think in FLINK-21164 I think have identified the right places to issue 
this call within the jar handlers.


On the CLI side, I suppose we can just replace all usages of 
deleteTempExtractedLibraries with close().



On 1/28/2021 7:47 AM, Maciek Próchniak wrote:


Hi Chesnay,

thanks for reply. I wonder if FLINK-21164 will help without FLINK-9844 
- if the jar file is not closed, it won't be successfully deleted?


As for FLINK-9844 - I understand that having code like

if (userClassLoader instanceof Closeable) { ((Closeable) 
userClassloader).close() }


is too "dirty trick" to be considered?


thanks,

maciek


On 27.01.2021 13:00, Chesnay Schepler wrote:
The problem of submitted jar files not being closed is a known one: 
https://issues.apache.org/jira/browse/FLINK-9844

IIRC it's not exactly trivial to fix since class-loading is involved.
It's not strictly related to the REST API; it also occurs in the CLI 
but is less noticeable since jars are usually not deleted.


As for the issue with deleteExtractedLibraries, Maciek is generally 
on a good track.
The explicit delete call is indeed missing. The best place to put is 
probably JarRunHandler#handleRequest, within handle after the job was 
run.

A similar issue also exists in the JarPlanHandler.

I've opened https://issues.apache.org/jira/browse/FLINK-21164 to fix 
this issue.


On 1/26/2021 12:21 PM, Maciek Próchniak wrote:


Hi Matthias,

I think the problem lies somewhere in JarRunHandler, as this is the 
place where the files are created.


I think these are not the files that are managed via BlobService, as 
they are not stored in BlobService folders (I made experiment 
changing default BlobServer folders).


It seems to me that CliFrontend deletes those files explicitly:

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L250

whereas I couldn't find such invocation in JarRunHandler (not 
deleting those files does not fully explain leak on heap though...)



thanks,

maciek

On 26.01.2021 11:16, Matthias Pohl wrote:

Hi Maciek,
my understanding is that the jars in the JobManager should be 
cleaned up after the job is terminated (I assume that your jobs 
successfully finished). The jars are managed by the BlobService. 
The dispatcher will trigger the jobCleanup in [1] after job 
termination. Are there any suspicious log messages that might 
indicate an issue?

I'm adding Chesnay to this thread as he might have more insights here.

[1] 
https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797 



On Mon, Jan 25, 2021 at 8:37 PM Maciek Próchniak > wrote:


Hello,

in our setup we have:

- Flink 1.11.2

- job submission via REST API (first we upload jar, then we submit
multiple jobs with it)

- additional jars embedded in lib directory of main jar (this
is crucial
part)

When we submit jobs this way, Flink creates new temp jar files via
PackagedProgram.extractContainedLibraries method.

We observe that they are not removed after job finishes - it
seems that
PackagedProgram.deleteExtractedLibraries is not invoked when
using REST
API.

What's more, it seems that those jars remain open in JobManager
process.
We observe that when we delete them manually via scripts, the
disk space
is not reclaimed until process is restarted, we also see via
heap dump
inspection that java.util.zip.ZipFile$Source  objects remain,
pointing
to those files. This is quite a problem for us, as we submit
quite a few
jobs, and after a while we ran out of either heap or disk space on
JobManager process/host. Unfortunately, I cannot so far find
where this
leak would happen...

Does anybody have some pointers where we can search? Or how to
fix this
behaviour?


thanks,

maciek







Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-28 Thread Sebastián Magrí
Hi Jark!

Please find the full pom file attached.

Best Regards,

On Thu, 28 Jan 2021 at 03:21, Jark Wu  wrote:

> Hi Sebastián,
>
> I think Dawid is right.
>
> Could you share the pom file? I also tried to
> package flink-connector-postgres-cdc with ServicesResourceTransformer, and
> the Factory file contains
>
> com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory
>
>
> Best,
> Jark
>
>
> On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí 
> wrote:
>
>> Thanks a lot for looking into it Dawid,
>>
>> In the
>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
>> file I only see
>>
>> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
>>
>> Even after applying the ServicesResourceTransformer.
>>
>>
>> On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz 
>> wrote:
>>
>>> Hi,
>>>
>>> Unfortunately I am not familiar with the packaging of
>>> flink-connector-postgres-cdc. Maybe @Jark could help here?
>>>
>>> However, I think the problem that you cannot find the connector is
>>> caused because of lack of entry in the resulting Manifest file. If there
>>> are overlapping classes maven does not exclude whole dependencies, but
>>> rather picks the overlapping class from one of the two. Could you check if
>>> you see entries for all tables in
>>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
>>>
>>> If not, you could try applying the ServicesResourceTransformer[1]
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1]
>>> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>>> On 26/01/2021 12:29, Sebastián Magrí wrote:
>>>
>>> Hi!
>>>
>>> I've reported an issue with the postgresql-cdc connector apparently
>>> caused by the maven shade plugin excluding either the JDBC connector or the
>>> cdc connector due to overlapping classes. The issue for reference is here:
>>>
>>> https://github.com/ververica/flink-cdc-connectors/issues/90
>>>
>>> In the meantime, however, I've been trying to figure out if I can set up
>>> an exclusion rule to fix this in my pom.xml file, without success.
>>>
>>> The `org.postgresql:postgresql` dependency is being added manually by me
>>> to have a sink on a postgresql table and injected by the cdc connector
>>> seemingly via its debezium connector dependency.
>>>
>>> Any guidance or hints I could follow would be really appreciated.
>>>
>>> --
>>> Sebastián Ramírez Magrí
>>>
>>>
>>
>> --
>> Sebastián Ramírez Magrí
>>
>

-- 
Sebastián Ramírez Magrí
http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;>
	4.0.0

	org.sebasmagri
	stats-aggregator
	1.0-SNAPSHOT
	jar

	Stats Aggregator

	
		UTF-8
		1.12.1
		2.11
		2.11.12
		2.12.1
		42.2.18.jre7
	

	
		
			org.apache.flink
			flink-scala_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-streaming-scala_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-clients_${scala.binary.version}
			${flink.version}
			provided
		

		
		
			org.scala-lang
			scala-library
			${scala.version}
			provided
		

		

		
		
			com.alibaba.ververica
			flink-connector-postgres-cdc
			1.1.0
		
		
			org.apache.flink
			flink-connector-jdbc_${scala.binary.version}
			${flink.version}
		
		
			org.postgresql
			postgresql
			${postgresql.version}
		

		
		
		
			org.apache.logging.log4j
			log4j-slf4j-impl
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-api
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-core
			${log4j.version}
			runtime
		

		

		
			org.apache.flink
			flink-table-api-scala-bridge_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-table-planner-blink_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-streaming-scala_${scala.binary.version}
			${flink.version}
			provided
		

	

	
		
			
			
			
org.apache.maven.plugins
maven-shade-plugin
3.1.1

	
	
		package
		
			shade
		
		
			
			

	org.apache.flink:force-shading
	com.google.code.findbugs:jsr305
	org.slf4j:*
	org.apache.logging.log4j:*

			
			

	
	*:*
	
		META-INF/*.SF
		META-INF/*.DSA
		META-INF/*.RSA
	

			
			


	org.sebasmagri.StatsAggregator

			
		
	

			

			
			
org.apache.maven.plugins
maven-compiler-plugin
3.1

	1.8
	1.8

			

			
			
net.alchim31.maven
scala-maven-plugin
3.2.2

	
		
			compile
			testCompile
		
	


	
		-nobootcp
	

			

			
			
org.apache.maven.plugins

Re: What causes a buffer pool exception? How can I mitigate it?

2021-01-28 Thread Arvid Heise
Also could you please provide the jobmanager log? It could also be that the
underlying failure is somewhere else.

On Thu, Jan 28, 2021 at 10:17 AM Arvid Heise  wrote:

> Hi Marco,
>
> In general, sending a compressed log to ML is totally fine. You can
> further minimize the log by disabling restarts.
> I looked into the logs that you provided.
>
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>  [] - Attempting to cancel task forward fill -> (Sink: tag
>> db sink, Sink: back fill db sink, Sink: min max step db sink) (2/2)
>> (8c1f256176fb89f112c27883350a02bc).
>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>  [] - forward fill -> (Sink: tag db sink, Sink: back fill
>> db sink, Sink: min max step db sink) (2/2)
>> (8c1f256176fb89f112c27883350a02bc) switched from RUNNING to CANCELING.
>> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>>  [] - Triggering cancellation of task code forward fill ->
>> (Sink: tag db sink, Sink: back fill db sink, Sink: min max step db sink)
>> (2/2) (8c1f256176fb89f112c27883350a02bc).
>> 2021-01-26 04:37:43,282 ERROR
>> xx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction []
>> - Error in timer.
>> java.lang.RuntimeException: Buffer pool is destroyed.
>>
>
> I can see that my suspicion is most likely correct: It first tries to
> cancel the task for some reason and then a later timer will show you the
> respective error. I created the ticket to resolve the issue [1]. There may
> also be an issue of swalled interruption exceptions, which we are looking
> into in parallel.
>
> However, there is a reason why the task is canceling in the first place
> and we need to find that. I recommend to not have a try-catch block around
> *collector.collect* in *ForwardFillKeyedProcessFunction*. Just have it
> around your user code but not around system calls. This may swallow the
> real cause.
>
> Are you executing the code in IDE? You may be able to set some breakpoints
> to quickly figure out what's going wrong (I can help then).
>
> [1] https://issues.apache.org/jira/browse/FLINK-21181
>
> On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise  wrote:
>
>> Hi Marco,
>>
>> could you share your full task manager and job manager log? We
>> double-checked and saw that the buffer pool is only released on
>> cancellation or shutdown.
>>
>> So I'm assuming that there is another issue (e.g., Kafka cluster not
>> reachable) and there is a race condition while shutting down. It seems like
>> the buffer pool exception is shadowing the actual cause then for yet
>> unknown reasons (this is an issue on its own, but you should be able to see
>> the actual issue in task manager log).
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>>
>>> Actually, the log I sent in my previous message, shows the only error
>>> that occurred before the buffer pool was destroyed. That
>>> intermittent warning:
>>>
>>> 2021-01-26 04:14:33,140 WARN
>>>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
>>> Committing offsets to Kafka takes longer than the checkpoint interval.
>>> Skipping commit of previous offsets because newer complete checkpoint
>>> offsets are available. This does not compromise Flink's checkpoint
>>> integrity.
>>> 2021-01-26 04:14:33,143 INFO
>>>  org.apache.kafka.clients.FetchSessionHandler [] -
>>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
>>> request (sessionId=936633685, epoch=1) to node 2: {}.
>>> org.apache.kafka.common.errors.DisconnectException: null
>>>
>>> I know that probably doesn't help much. Sorry.
>>>
>>> On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise  wrote:
>>>
 Hi Marco,

 the network buffer pool is destroyed when the task manager is shutdown.
 Could you check if you have an error before that in your log?

 It seems like the timer is triggered at a point where it shouldn't.
 I'll check if there is a known issue that has been fixed in later versions.
 Do you have the option to upgrade to 1.11.3?

 Best,

 Arvid

 On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <
 mvillalo...@kineteque.com> wrote:

> Hi.  What causes a buffer pool exception? How can I mitigate it? It is
> causing us plenty of problems right now.
>
> 2021-01-26 04:14:33,041 INFO
>  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
> Subtask 1 received completion notification for checkpoint with id=4.
> 2021-01-26 04:14:33,140 WARN
>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
> Committing offsets to Kafka takes longer than the checkpoint interval.
> Skipping commit of previous offsets because newer complete checkpoint
> offsets are available. This does not compromise Flink's checkpoint
> 

Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 Thread Jessica.J.Wang
你使用的是什么窗口呢,是 Tumble或者Hop吗,如果没数据 但是想提前输出结果可以用 emit
https://help.aliyun.com/document_detail/98951.html?spm=5176.11065259.1996646101.searchclickresult.513d1037M5VADa



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: What causes a buffer pool exception? How can I mitigate it?

2021-01-28 Thread Arvid Heise
Hi Marco,

In general, sending a compressed log to ML is totally fine. You can further
minimize the log by disabling restarts.
I looked into the logs that you provided.

2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>[] - Attempting to cancel task forward fill -> (Sink: tag db
> sink, Sink: back fill db sink, Sink: min max step db sink) (2/2)
> (8c1f256176fb89f112c27883350a02bc).
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>  [] - forward fill -> (Sink: tag db sink, Sink: back fill
> db sink, Sink: min max step db sink) (2/2)
> (8c1f256176fb89f112c27883350a02bc) switched from RUNNING to CANCELING.
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task
>  [] - Triggering cancellation of task code forward fill ->
> (Sink: tag db sink, Sink: back fill db sink, Sink: min max step db sink)
> (2/2) (8c1f256176fb89f112c27883350a02bc).
> 2021-01-26 04:37:43,282 ERROR
> xx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction []
> - Error in timer.
> java.lang.RuntimeException: Buffer pool is destroyed.
>

I can see that my suspicion is most likely correct: It first tries to
cancel the task for some reason and then a later timer will show you the
respective error. I created the ticket to resolve the issue [1]. There may
also be an issue of swalled interruption exceptions, which we are looking
into in parallel.

However, there is a reason why the task is canceling in the first place and
we need to find that. I recommend to not have a try-catch block around
*collector.collect* in *ForwardFillKeyedProcessFunction*. Just have it
around your user code but not around system calls. This may swallow the
real cause.

Are you executing the code in IDE? You may be able to set some breakpoints
to quickly figure out what's going wrong (I can help then).

[1] https://issues.apache.org/jira/browse/FLINK-21181

On Wed, Jan 27, 2021 at 8:54 AM Arvid Heise  wrote:

> Hi Marco,
>
> could you share your full task manager and job manager log? We
> double-checked and saw that the buffer pool is only released on
> cancellation or shutdown.
>
> So I'm assuming that there is another issue (e.g., Kafka cluster not
> reachable) and there is a race condition while shutting down. It seems like
> the buffer pool exception is shadowing the actual cause then for yet
> unknown reasons (this is an issue on its own, but you should be able to see
> the actual issue in task manager log).
>
> Best,
>
> Arvid
>
> On Tue, Jan 26, 2021 at 5:13 PM Marco Villalobos <
> mvillalo...@kineteque.com> wrote:
>
>> Actually, the log I sent in my previous message, shows the only error
>> that occurred before the buffer pool was destroyed. That
>> intermittent warning:
>>
>> 2021-01-26 04:14:33,140 WARN
>>  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
>> Committing offsets to Kafka takes longer than the checkpoint interval.
>> Skipping commit of previous offsets because newer complete checkpoint
>> offsets are available. This does not compromise Flink's checkpoint
>> integrity.
>> 2021-01-26 04:14:33,143 INFO
>>  org.apache.kafka.clients.FetchSessionHandler [] -
>> [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
>> request (sessionId=936633685, epoch=1) to node 2: {}.
>> org.apache.kafka.common.errors.DisconnectException: null
>>
>> I know that probably doesn't help much. Sorry.
>>
>> On Mon, Jan 25, 2021 at 11:44 PM Arvid Heise  wrote:
>>
>>> Hi Marco,
>>>
>>> the network buffer pool is destroyed when the task manager is shutdown.
>>> Could you check if you have an error before that in your log?
>>>
>>> It seems like the timer is triggered at a point where it shouldn't. I'll
>>> check if there is a known issue that has been fixed in later versions. Do
>>> you have the option to upgrade to 1.11.3?
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <
>>> mvillalo...@kineteque.com> wrote:
>>>
 Hi.  What causes a buffer pool exception? How can I mitigate it? It is
 causing us plenty of problems right now.

 2021-01-26 04:14:33,041 INFO
  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] -
 Subtask 1 received completion notification for checkpoint with id=4.
 2021-01-26 04:14:33,140 WARN
  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] -
 Committing offsets to Kafka takes longer than the checkpoint interval.
 Skipping commit of previous offsets because newer complete checkpoint
 offsets are available. This does not compromise Flink's checkpoint
 integrity.
 2021-01-26 04:14:33,143 INFO
  org.apache.kafka.clients.FetchSessionHandler [] -
 [Consumer clientId=consumer-luminai-2, groupId=luminai] Error sending fetch
 request (sessionId=936633685, epoch=1) to node 2: {}.
 

Flink1.12 批处理模式,分词统计时单词个数为1的单词不会被打印

2021-01-28 Thread xhyan0427
代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
  
env.setRuntimeMode(RuntimeExecutionMode.BATCH)  // 在DataStream
API上以批处理方式执行

// 本地测试文件
val inputStream =
env.readTextFile(getClass.getResource("/hello.txt").getPath)

// 分词统计,问题:批处理模式的时候,sum 为 1 的单词不会被打印
val resultStream = inputStream
  .flatMap(_.split(","))
  .filter(_.nonEmpty)
  .map((_, 1))
  .keyBy(_._1)
  .sum(1)
resultStream.print()
env.execute("word count")

测试文件的数据内容:
hello,flink
hello,flink
hello,hive
hello,hive
hello,hbase
hello,hbase
hello,scala
hello,kafka
hello,kafka


测试结果:hello/flink/hive/hbase/kafka的和大于1,会打印出来;但是 scala的个数为1,不会被打印出来



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Timers not firing until stream end

2021-01-28 Thread Pilgrim Beart
Scratch that - your WatermarkStrategy DOES work (when I implement it
correctly!).
Well, almost: As you can see below (code pushed to repo), the Timer events
are still appearing somewhat late in the stream - 4 events late in this
case. It may be just good-enough for my purposes, though it will make
building test cases painful, so if you have any ideas how I could fix that,
would be much appreciated.

{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelled previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelled previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelled previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelled previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelled previous timer. "}
{"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelled previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelled previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelled previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 5000 Cancelled previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 5000 Cancelled previous timer. "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelled previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelled previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelled previous timer. "}
{"ts":5000,"id":"1","is_online":false,"log":"timestamp is 5000"}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 7000 Cancelled previous timer. "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelled previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelled previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelled previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelled previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelled previous timer. "}
{"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelled previous timer. "}
{"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelled previous timer. "}
{"ts":1,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelled previous timer. "}
{"ts":1,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelled previous timer. "}
{"ts":1,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelled previous timer. "}
{"ts":11000,"id":"1","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"2","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"0","is_online":false,"log":"timestamp is 11000"}
-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot

 +44 7961 125282
See our latest features

and book me

for
a video call.



On Thu, 28 Jan 2021 at 08:37, Pilgrim Beart 
wrote:

> Chesnay,
> I cannot reproduce this - I've tried the approaches you suggest, but
> nothing I've done makes the timers fire at the correct time in the stream -
> they only fire when the stream has ended. If you have an EventTime example
> where they fire at the right time in the stream, I'd love to see it. Or any
> ideas for other 

Re: Timers not firing until stream end

2021-01-28 Thread Pilgrim Beart
Chesnay,
I cannot reproduce this - I've tried the approaches you suggest, but
nothing I've done makes the timers fire at the correct time in the stream -
they only fire when the stream has ended. If you have an EventTime example
where they fire at the right time in the stream, I'd love to see it. Or any
ideas for other things to try? Could it perhaps be related to using a file
source?

Thanks,

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot

 +44 7961 125282
See our latest features

and book me

for
a video call.



On Wed, 27 Jan 2021 at 17:55, Chesnay Schepler  wrote:

> Actually, if the parallelism is 1 then it works as it should. sigh
>
> On 1/27/2021 6:52 PM, Chesnay Schepler wrote:
>
> Note that while this does fix the issue of timers not firing while the job
> is running, it seems to be firing too many timers...
>
> On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
>
> My bad, I was still using the custom WatermarkStrategy that emits a
> watermark for each event.
>
> .assignTimestampsAndWatermarks(
> new WatermarkStrategy() {
> @Override
> public WatermarkGenerator 
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
> return new 
> BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(1)) {
> @Override
> public void onEvent(Tevent, long eventTimestamp, 
> WatermarkOutput output) {
> super.onEvent(event, eventTimestamp, output);
> super.onPeriodicEmit(output);
> }
> };
> }
> }.withTimestampAssigner(...)
>
>
> @Aljoscha This is about Flink 1.11. Since the periodic watermarks are
> dependent on processing time, am I assuming correctly if the job finishes
> quickly that watermarks may never be emitted (except for those at the job)?
> Is there any way to emit periodic watermarks based on event time?
> Is there any way to enable punctuated watermarks for the existing
> watermark strategies without having to implement a custom one?
>
> On 1/27/2021 5:57 PM, Pilgrim Beart wrote:
>
> Chesnay,
> Thanks for this - I've made the change you suggested
> (setAutoWatermarkInterval) but it hasn't changed the behaviour - timers
> still get processed only on stream end.
> I have pushed a new version, with this change, and also emitting some
> information in a .log field.
> If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll
> see the relevant changes.
>
> In DPTimeoutFunction you'll see that if I add code to say "cancel the
> timer only if it wouldn't have gone off" then the output is now correct -
> individual devices do timeout. However, this output only appears at the end
> of the stream (i.e. time jumps backwards as all the timers are processed)
> so I still appear not to be seeing timer processing at the correct event
> time. If there was no end of stream, I would never get any timeouts.
>
> Below is the output I get when I run. This output is correct but:
> a) only because I am manually cancelling timers in DPTimeoutFunction
> (search for "!!!")
> b) the timer events are timestamped correctly, but are not emitted into
> the stream at the right time - and if the stream didn't end then no
> timeouts would ever occur (which in particular means that devices that
> never come back online will never get marked as offline).
>
> Perhaps I do need to implement an onPeriodicEmit function? Does that
> require a customer watermark strategy? I can see how to define a custom
> watermark at link below, but unclear how to install that?
>
> https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy
>
> {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
> {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
> {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
> {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
> msg_in.ts 1000 Cancelling previous timer. "}
> {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
> msg_in.ts 1000 Cancelling previous timer. "}
> {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
> msg_in.ts 2000 Cancelling previous timer. "}
> 

Re: flink slot communication

2021-01-28 Thread Piotr Nowojski
Hi,

Yes Dawid is correct. Communications between two tasks on the same
TaskManager are not going through the network, but via "local" channel
(`LocalInputChannel`). It's still serialising and deserializing the data,
but there are no network overheads, and local channels have only half of
the memory consumption of the "remote" channels. FYI, if you look at the
network metrics, there are a couple of metrics that make distinction
between local/remote data (like `numBytesInLocal` counter).

> slot 1-1 , slot 1-2, slot 1-3 share TCP connections communicate with slot
2-1, slot 2-2, slot 2-3

Yes, there is only a single TCP connection shared between a pair of
TaskManagers.

Piotrek

wt., 26 sty 2021 o 12:43 Dawid Wysakowicz 
napisał(a):

> Hi,
>
> If tasks end up in the same TaskManager, they us LocalInputChannel(s),
> which does not go through network, but reads directly from local partitions.
>
> I am also pulling in @Piotr who might give you some more insights, or
> correct me if I am wrong.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.html
>
> Best,
>
> Dawid
> On 26/01/2021 08:18, 耿延杰 wrote:
>
> Hi,
>
> I'm confused about slots communication in same taskmanager.
>
> Assume only one job which running on per-job cluster with parallalism = 6.
> Each taskmanager with 3 slot.
> There are 6 slot:
> slot 1-1, slot 1-2, slot 1-3,
> slot 2-1, slot 2-2 , slot 2-3
>
> Assume the job has 'KeyBy' operator, thus, eash 'keyby' in each slot will
> distribute data into
> all downstream opeators in every slots.
>
> I know that
>
> Tasks in the same JVM share TCP connections (via multiplexing) and
> heartbeat messages
>
> That means:
> slot 1-1 , slot 1-2, slot 1-3 share TCP connections communicate with slot
> 2-1, slot 2-2, slot 2-3
>
> My question is:
>
> how about communication type between slot 1-1 and slot 1-2?
> by thread-to-thread?
> or by network?
>
>


Re: 关于端到端的延迟监控

2021-01-28 Thread zelin jin
每一条records处理过程中透传开始时间,在sink算子通过metrics上报opentsdb、Prometheus

等时间序列数据库,最后通过grafana等可视化工具展示。

wpp <1215303...@qq.com> 于2021年1月28日周四 下午2:53写道:

> 这个延迟,只是给一个参考意义吧,
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 Thread 林影
Hi,刘小红:
这个我查阅过,实验过,通过idle source 是无法解决的。
可以查看这个链接:http://apache-flink.147419.n8.nabble.com/Flink-SQL-td4535.html

在我的这个场景里面,上游已经配置了idle source,上游如果突然没有数据了,下游的flink 窗口还是无法关闭输出结果。
这个说明idle source 无法解决这个问题

刘小红 <18500348...@163.com> 于2021年1月28日周四 下午3:01写道:

> 可以调用WatermarkStrategy.withIdleness(Duration idleTimeout)
> 指定空闲超时时间,这样不会影响水印的进度,进而影响下游算子操作
>
>
> | |
> 刘小红
> |
> |
> 18500348...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年1月28日 14:42,wpp<1215303...@qq.com> 写道:
> 可以按照proceeTime来处理吧
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>