Re: Re: Gradually increasing checkpoint size

2021-03-08 Thread Dan Hill
Hi Yun!

That advice was useful.  The state for that operator is very small (31kb).
Most of the checkpoint size is in a couple simple DataStream.intervalJoin
operators.  The time intervals are fairly short.

I'm going to try running the code with some small configuration changes.
One thing I did notice is that I set a positive value for the
relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator
.
The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for
clean up.  It has some logic around cleaning up the right side that
uses timerTimestamp
+ lowerBound
.
However, processElement doesn’t use the same logic when creating a timer (I
only see + lowerBound
).
Maybe I'm misreading the code.  It feels like a bug.


On Mon, Mar 8, 2021 at 10:29 PM Yun Gao  wrote:

> Hi Dan,
>
> Regarding the original checkpoint size problem, could you also have a
> check
> which tasks' state are increasing from the checkpoint UI ? For example,
> the
> attached operator has a `alreadyOutputed` value state, which seems to keep
> increasing if there are always new keys ?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Dan Hill 
> *Send Date:*Tue Mar 9 00:59:24 2021
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: Gradually increasing checkpoint size
>
>> Hi Yun!
>>
>> Thanks for the quick reply.
>>
>> One of the lowerBounds is large but the table being joined with is ~500
>> rows.  I also have my own operator that only outputs the first value.
>>
>> public class OnlyFirstUser extends
>> RichFlatMapFunction {
>>
>>
>> private transient ValueState alreadyOutputted;
>>
>>
>> @Override
>>
>> public void flatMap(T value, Collector out) throws Exception {
>>
>> if (!alreadyOutputted.value()) {
>>
>> alreadyOutputted.update(true);
>>
>> out.collect(value);
>>
>> }
>>
>> }
>>
>>
>> @Override
>>
>> public void open(Configuration config) {
>>
>> ValueStateDescriptor descriptor =
>>
>> new ValueStateDescriptor<>(
>>
>> "alreadyOutputted", // the state name
>>
>> TypeInformation.of(new TypeHint() {}),
>> // type information
>>
>> false); // default value of the state, if
>> nothing was set
>>
>> alreadyOutputted = getRuntimeContext().getState(descriptor);
>>
>> }
>>
>> }
>>
>> All of my inputs have this watermark strategy.  In the Flink UI, early in
>> the job run, I see "Low Watermarks" on each node and they increase.  After
>> some checkpoint failures, low watermarks stop appearing in the UI
>> 
>> .
>>
>>
>> .assignTimestampsAndWatermarks(
>>
>>
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>
>>
>>
>> Thanks Yun!
>>
>>
>> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao  wrote:
>>
>>> Hi Dan,
>>>
>>> Have you use a too large upperBound or lowerBound?
>>>
>>> If not, could you also check the watermark strategy ?
>>> The interval join operator depends on the event-time
>>> timer for cleanup, and the event-time timer would be
>>> triggered via watermark.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*Dan Hill 
>>> *Send Date:*Mon Mar 8 14:59:48 2021
>>> *Recipients:*user 
>>> *Subject:*Gradually increasing checkpoint size
>>>
 Hi!

 I'm running a backfill Flink stream job over older data.  It has
 multiple interval joins.  I noticed my checkpoint is regularly gaining in
 size.  I'd expect my checkpoints to stabilize and not grow.

 Is there a setting to prune useless data from the checkpoint?  My top
 guess is that my checkpoint has a bunch of useless state in it.

 - Dan

>>>


Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-08 Thread yidan zhao
而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?

或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。

yidan zhao  于2021年3月9日周二 下午2:56写道:

> 好的,我会看下。
> 然后我今天发现我好多个集群GC collector不一样。
> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
> threads,还有一种是Mark Sweep Compact GC。
> 大佬们,Flink是根据内存大小有什么动态调整吗。
>
>
> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>
>
> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>
>> Hi,
>>
>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>
>> Best,
>> jjiey
>>
>> > 2021年3月8日 14:37,yidan zhao  写道:
>> >
>> >
>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>> > leadership’ 错导致任务重启。
>> >
>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>> > 2021-03-08 14:31:40
>> > org.apache.flink.runtime.io
>> .network.netty.exception.RemoteTransportException:
>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>> >at org.apache.flink.runtime.io.network.netty.
>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>> > CreditBasedPartitionRequestClientHandler.java:294)
>> >at org.apache.flink.runtime.io.network.netty.
>> > CreditBasedPartitionRequestClientHandler.channelRead(
>> > CreditBasedPartitionRequestClientHandler.java:183)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:379)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:365)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> > .java:357)
>> >at org.apache.flink.runtime.io.network.netty.
>> > NettyMessageClientDecoderDelegate.channelRead(
>> > NettyMessageClientDecoderDelegate.java:115)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:379)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:365)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> > .java:357)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >
>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>> > 1410)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:379)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:365)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>> > AbstractEpollStreamChannel.java:792)
>> >at
>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>> > .processReady(EpollEventLoop.java:475)
>> >at
>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>> > .run(EpollEventLoop.java:378)
>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> >at java.lang.Thread.run(Thread.java:748)
>> > Caused by: org.apache.flink.runtime.io.network.partition.
>> > ProducerFailedException: org.apache.flink.util.FlinkException:
>> JobManager
>> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>> > .writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>> > .enqueueAvailableReader(PartitionRequestQueue.java:108)
>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>> > .userEventTriggered(PartitionRequestQueue.java:170)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeUserEventTriggered(
>> > AbstractChannelHandlerContext.java:346)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeUserEventTriggered(
>> > AbstractChannelHandlerContext.java:332)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.fireUserEventTriggered(
>> > 

Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-08 Thread yidan zhao
好的,我会看下。
然后我今天发现我好多个集群GC collector不一样。
目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
"-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
threads,还有一种是Mark Sweep Compact GC。
大佬们,Flink是根据内存大小有什么动态调整吗。

不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。


杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:

> Hi,
>
>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>
> Best,
> jjiey
>
> > 2021年3月8日 14:37,yidan zhao  写道:
> >
> >
> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
> > leadership’ 错导致任务重启。
> >
> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
> > 2021-03-08 14:31:40
> > org.apache.flink.runtime.io
> .network.netty.exception.RemoteTransportException:
> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
> >at org.apache.flink.runtime.io.network.netty.
> > CreditBasedPartitionRequestClientHandler.decodeMsg(
> > CreditBasedPartitionRequestClientHandler.java:294)
> >at org.apache.flink.runtime.io.network.netty.
> > CreditBasedPartitionRequestClientHandler.channelRead(
> > CreditBasedPartitionRequestClientHandler.java:183)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:379)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:365)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> > .java:357)
> >at org.apache.flink.runtime.io.network.netty.
> > NettyMessageClientDecoderDelegate.channelRead(
> > NettyMessageClientDecoderDelegate.java:115)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:379)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:365)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> > .java:357)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >
> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
> > 1410)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:379)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:365)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
> > AbstractEpollStreamChannel.java:792)
> >at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
> > .processReady(EpollEventLoop.java:475)
> >at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
> > .run(EpollEventLoop.java:378)
> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> >at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.io.network.partition.
> > ProducerFailedException: org.apache.flink.util.FlinkException: JobManager
> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
> > .writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
> > .enqueueAvailableReader(PartitionRequestQueue.java:108)
> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
> > .userEventTriggered(PartitionRequestQueue.java:170)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeUserEventTriggered(
> > AbstractChannelHandlerContext.java:346)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeUserEventTriggered(
> > AbstractChannelHandlerContext.java:332)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.fireUserEventTriggered(
> > AbstractChannelHandlerContext.java:324)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >
> ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter
> > .java:117)
> >at org.apache.flink.shaded.netty4.io.netty.handler.codec.
> > 

Re: [ANNOUNCE] Apache Flink 1.12.2 released

2021-03-08 Thread Leonard Xu
Well done! Thanks to Roman and Yuan, and everyone who contributed to the 
release.

Best,
Leonard

> 在 2021年3月9日,11:39,Zhu Zhu  写道:
> 
> Thanks Roman and Yuan for being the release managers! Thanks everyone who has 
> made this release possible!
> 
> Cheers,
> Zhu
> 
> Piotr Nowojski mailto:pnowoj...@apache.org>> 
> 于2021年3月6日周六 上午12:38写道:
> Thanks Roman and Yuan for your work and driving the release process :)
> 
> pt., 5 mar 2021 o 15:53 Till Rohrmann  > napisał(a):
> Great work! Thanks a lot for being our release managers Roman and Yuan and
> to everyone who has made this release possible.
> 
> Cheers,
> Till
> 
> On Fri, Mar 5, 2021 at 10:43 AM Yuan Mei  > wrote:
> 
> > Cheers!
> >
> > Thanks, Roman, for doing the most time-consuming and difficult part of the
> > release!
> >
> > Best,
> >
> > Yuan
> >
> > On Fri, Mar 5, 2021 at 5:41 PM Roman Khachatryan  > > wrote:
> >
> > > The Apache Flink community is very happy to announce the release of
> > Apache
> > > Flink 1.12.2, which is the second bugfix release for the Apache Flink
> > 1.12
> > > series.
> > >
> > > Apache Flink® is an open-source stream processing framework for
> > > distributed, high-performing, always-available, and accurate data
> > streaming
> > > applications.
> > >
> > > The release is available for download at:
> > > https://flink.apache.org/downloads.html 
> > > 
> > >
> > > Please check out the release blog post for an overview of the
> > improvements
> > > for this bugfix release:
> > > https://flink.apache.org/news/2021/03/03/release-1.12.2.html 
> > > 
> > >
> > > The full release notes are available in Jira:
> > >
> > >
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522
> >  
> > 
> > >
> > > We would like to thank all contributors of the Apache Flink community who
> > > made this release possible!
> > >
> > > Special thanks to Yuan Mei for managing the release and PMC members
> > Robert
> > > Metzger, Chesnay Schepler and Piotr Nowojski.
> > >
> > > Regards,
> > > Roman
> > >
> >



Re: Re: Gradually increasing checkpoint size

2021-03-08 Thread Yun Gao
Hi Dan,

Regarding the original checkpoint size problem, could you also have a check 
which tasks' state are increasing from the checkpoint UI ? For example, the 
attached operator has a `alreadyOutputed` value state, which seems to keep
increasing if there are always new keys ?

Best,
Yun



 --Original Mail --
Sender:Dan Hill 
Send Date:Tue Mar 9 00:59:24 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Gradually increasing checkpoint size

Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500 rows.  
I also have my own operator that only outputs the first value.

public class OnlyFirstUser extends 
RichFlatMapFunction {

 private transient ValueState alreadyOutputted;

 @Override
 public void flatMap(T value, Collector out) throws Exception {
 if (!alreadyOutputted.value()) {
 alreadyOutputted.update(true);
 out.collect(value);
 }
 }

 @Override
 public void open(Configuration config) {
 ValueStateDescriptor descriptor =
 new ValueStateDescriptor<>(
 "alreadyOutputted", // the state name
 TypeInformation.of(new TypeHint() {}), // type information
 false); // default value of the state, if nothing was set
 alreadyOutputted = getRuntimeContext().getState(descriptor);
 }
}

All of my inputs have this watermark strategy.  In the Flink UI, early in the 
job run, I see "Low Watermarks" on each node and they increase.  After some 
checkpoint failures, low watermarks stop appearing in the UI.


.assignTimestampsAndWatermarks(
 
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));


Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao  wrote:

Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun



 --Original Mail --
Sender:Dan Hill 
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user 
Subject:Gradually increasing checkpoint size

Hi!

I'm running a backfill Flink stream job over older data.  It has multiple 
interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd 
expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is 
that my checkpoint has a bunch of useless state in it.

- Dan

Future of QueryableState

2021-03-08 Thread Maciek Próchniak

Hello,


We are using QueryableState in some of Nussknacker deployments as a nice 
addition, allowing end users to peek inside job state for a given key 
(we mostly use custom operators).



Judging by mailing list and feature radar proposition by Stephan: 
https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg 



this feature is not widely used/supported. I'd like to ask:

- are there any alternative ways of accessing state during job 
execution? State API is very nice, but it operates on checkpoints and 
loading whole state to lookup one key seems a bit heavy?


- are there any inherent problems in QueryableState design (e.g. it's 
not feasible to use it in K8 settings, performance considerations) or 
just lack of interest/support (in that case we may offer some help)?



thanks,

maciek



Re: flink Application Native k8s使用oss作为backend日志偶尔报错

2021-03-08 Thread seuzxc
请问您这个问题解决了吗,我的也有这个错误信息



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?

2021-03-08 Thread seuzxc
我用oss设置,状态信息能写入,但是oss日志总是提示如下的信息,有遇到过吗?

2021-03-08 20:18:58.512  [INFO][cluster-io-thread-2]:
o.a.f.f.o.s.c.a.o.c.u.LogUtils 66 logException - [Server]Unable to execute
HTTP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 604616328586350B9C61
[HostId]: null




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: rowtime 的类型序列化问题

2021-03-08 Thread JudeZhu
我也遇到了同样的问题,请问最后是怎么解决的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: rowtime 的类型序列化问题

2021-03-08 Thread JudeZhu
我跟你使用的方法一样,也是加工数据源创建临时view然后传递到sink,其中用到了rowtime,遇到和你同样的错,请问是怎么解决的最后



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-08 Thread Smile
对,离线和实时的计算语义本来就是不一样的,所以这个地方也没有特别完美的解决方案,一般都是 case by case 看一下。
有一些显而易见的问题比如 Join 是否关联成功这种还是比较容易查,其他的确实不太好判断。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Hadoop Integration Link broken in downloads page

2021-03-08 Thread Debraj Manna
Hi

It appears the Hadoop Interation

link is broken on downloads  page.

Apache Flink® 1.12.2 is our latest stable release.
> If you plan to use Apache Flink together with Apache Hadoop (run Flink on
> YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
> system connector), please check out the Hadoop Integration
> 
>  documentation.


It is throwing 404 Error.

Thanks


Re: Trigger and completed Checkpointing do not appeared

2021-03-08 Thread Smile
Hi,

Could you please change the source to an endless one? For example a Kafka
source or a custom source that implements SourceFunction([1])? 
env.readTextFile() won't wait for all data to be finished, but exit
immediately after telling readers what to read. So it may exit before the
first checkpoint being triggered. See [2] for more information.

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Is Ververica Connector Redis open source?

2021-03-08 Thread Yik San Chan
Hi community,

I found this package
https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-redis/1.11-vvr-2.1.3
in Maven Repository. However, I cannot find it anywhere in GitHub. Does
anyone know this is open source or not?

Thank you!

Best,
Yik San Chan


Re: [ANNOUNCE] Apache Flink 1.12.2 released

2021-03-08 Thread Zhu Zhu
Thanks Roman and Yuan for being the release managers! Thanks everyone who
has made this release possible!

Cheers,
Zhu

Piotr Nowojski  于2021年3月6日周六 上午12:38写道:

> Thanks Roman and Yuan for your work and driving the release process :)
>
> pt., 5 mar 2021 o 15:53 Till Rohrmann  napisał(a):
>
>> Great work! Thanks a lot for being our release managers Roman and Yuan and
>> to everyone who has made this release possible.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 5, 2021 at 10:43 AM Yuan Mei  wrote:
>>
>> > Cheers!
>> >
>> > Thanks, Roman, for doing the most time-consuming and difficult part of
>> the
>> > release!
>> >
>> > Best,
>> >
>> > Yuan
>> >
>> > On Fri, Mar 5, 2021 at 5:41 PM Roman Khachatryan 
>> wrote:
>> >
>> > > The Apache Flink community is very happy to announce the release of
>> > Apache
>> > > Flink 1.12.2, which is the second bugfix release for the Apache Flink
>> > 1.12
>> > > series.
>> > >
>> > > Apache Flink® is an open-source stream processing framework for
>> > > distributed, high-performing, always-available, and accurate data
>> > streaming
>> > > applications.
>> > >
>> > > The release is available for download at:
>> > > https://flink.apache.org/downloads.html
>> > >
>> > > Please check out the release blog post for an overview of the
>> > improvements
>> > > for this bugfix release:
>> > > https://flink.apache.org/news/2021/03/03/release-1.12.2.html
>> > >
>> > > The full release notes are available in Jira:
>> > >
>> > >
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522
>> > >
>> > > We would like to thank all contributors of the Apache Flink community
>> who
>> > > made this release possible!
>> > >
>> > > Special thanks to Yuan Mei for managing the release and PMC members
>> > Robert
>> > > Metzger, Chesnay Schepler and Piotr Nowojski.
>> > >
>> > > Regards,
>> > > Roman
>> > >
>> >
>>
>


flink 与 hive版本选择

2021-03-08 Thread 张锴
请教一下各位大佬,flink哪个版本与hive3.x以上的版本兼容性更好呢,目前在flink版本上做选择,后续暂不会升级,希望大佬们给点建议。


Re: Re: How to check checkpointing mode

2021-03-08 Thread Yun Gao
Hi Alexey,

Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of 
see#checkpointCfg#checkpointingMode ?

Best,
Yun


 --Original Mail --
Sender:Alexey Trenikhun 
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List , Yun Gao 

Subject:Re: How to check checkpointing mode

Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods 
dealing with command line parameters etc, below two major methods:

@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
// We don't expect any parameters, but Flink 1.12 adds JVM options to job 
args, since we add
// -- after jobs argument, this unnecessary for us arguments will be 
treated as positional
// parameters, which we ignore but log warning
LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
see.execute(name);
return null;
  } catch (InterruptedException e) {
LOGGER.error("Stream Processor was interrupted", e);
Thread.currentThread().interrupt();
throw e;
  } catch (Exception e) {
LOGGER.error("Stream Processor is terminated due to exception", e);
throw e;
  }
}
private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws 
IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
  .getExecutionEnvironment()
  .enableCheckpointing(checkpointInterval.toMillis(),
  CheckpointingMode.AT_LEAST_ONCE)
  .setMaxParallelism(1024)
  .setParallelism(parallelism);
  if (externalizedCheckpoints) {
see.getCheckpointConfig()

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
  .putAll(kafkaCommonOptions)
  .putAll(kafkaProducerOptions)
  .varFiles(valueFiles)
  .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
  .semantic(Semantic.AT_LEAST_ONCE)
  .config(producerProperties)
  .build();

  final AutoTopic autoTopic = AutoTopic.builder()
  .config(producerProperties)
  .partitions(autoCreateTopicsPartitions)
  .replicationFactor(autoCreateTopicsReplicationFactor)
  .doNotCreateTopics(ImmutableSet.of(
  gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
  ))
  .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 
Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole 
TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in 
event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
  new TStateCleanupOnTimeout.Factory(
  maxCallDuration,
  postmortemCallDuration,
  globalAppConfig.timerGranularity()
  );

  @Nullable final SingleOutputStreamOperator cfgXform;
  @Nullable final DataStream cfgSource = addSources(see,
  SourceTopic.GCA_CFG,
  new CfgJsonDeserializationSchema(),
  (event, timestamp) -> event.getBatchId(),
  it -> !it.getHeartbeat());

  if (cfgSource != null) {
cfgXform = cfgSource
.keyBy(PbCfgDatum::getCcId)
.process(new CfgTransform())
.uid("xform-cfg")
.name("XForm Config");

if (!isNullOrEmpty(gspCfg)) {
  cfgXform.addSink(producerFactory.create(gspCfg,
  autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg
  .uid("uid-" + gspCfg)
  .name(gspCfg);
} else {
  cfgXform.addSink(new DiscardingSink<>())
  .uid("uid-gsp-cfg-null")
  .name("gsp-cfg-null");
}
  } else {
cfgXform = null;
  }

  final DataStream voiceCallThreadSource = addSources(see,
  SourceTopic.VOICE_CALL_THREAD,
  callThreadFormat == KafkaTopicFormat.JSON
  ? new TJsonDeserializationSchema()
  : new CallEventDeserializationSchema(),
  (event, timestamp) ->
  Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
  ? timestamp - deviationMillis
  : Instants.toMillis(event.getTimestamp()),
  event -> event.getType() != EventType.EVENT_UNKNOWN);

  final SingleOutputStreamOperator tcmDataStream1 = 
voiceCallThreadSource
  .keyBy(CallEventKey::new)
  .process(new TIntakeProcessFunction(cleanupFactory))
  .returns(PbTypes.TCM_DATUM)
  .uid("intake-voice-calls")
  

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-08 Thread Shengkai Fang
Sorry for the typo...

I mean it will not take too much time.

Best,
Shengkai

Shengkai Fang  于2021年3月9日周二 上午10:25写道:

> Hi, Yuval.
>
> I have opened a ticket about this[1]. But I don't think we have any
> solution to solve.
>
> Do you have time to help us to solve this? I think it will take too much
> time.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21675
>
> Yuval Itzchakov  于2021年3月8日周一 下午9:18写道:
>
>> Thank you Shenkai,
>> That does explain what I'm seeing.
>>
>> Jark / Shenkai - Is there any workaround to get Flink to work with push
>> watermarks and predicate pushdown until this is resolved?
>>
>> On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang  wrote:
>>
>>> Hi, Yuval, Jark, Timo.
>>>
>>> Currently the watermark push down happens in the logical rewrite phase
>>> but the filter push down happens in the local phase, which means the
>>> planner will first check the Filter push down and then check the watermark
>>> push down.
>>>
>>> I think we need a rule to transpose between the filter and watermark
>>> assigner or extend the filter push down rule to capture the structure that
>>> the watermark assigner is the parent of the table scan.
>>>
>>> Best,
>>> Shengkai
>>>
>>> Yuval Itzchakov  于2021年3月8日周一 上午12:13写道:
>>>
 Hi Jark,

 Even after implementing both, I don't see the watermark being pushed to
 the tablesource in the logical plan and avoids predicate pushdown from
 running.

 On Sun, Mar 7, 2021, 15:43 Jark Wu  wrote:

> Hi Yuval,
>
> That's correct you will always get a LogicalWatermarkAssigner if you
> assigned a watermark.
> If you implement SupportsWatermarkPushdown,
> the LogicalWatermarkAssigner will be pushed
> into TableSource, and then you can push Filter into source if source
> implement SupportsFilterPushdown.
>
> Best,
> Jark
>
> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov 
> wrote:
>
>> Hi Timo,
>> After investigating this further, this is actually non related to
>> implementing SupportsWatermarkPushdown.
>>
>> Once I create a TableSchema for my custom source's RowData, and
>> assign it a watermark (see my example in the original mail), the plan 
>> will
>> always include a LogicalWatermarkAssigner. This assigner that is between
>> the LogicalTableScan and the LogicalFilter will then go on and fail the
>> HepPlanner from invoking the optimization since it requires
>> LogicalTableScan to be a direct child of LogicalFilter. Since I have
>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
>> work.
>>
>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther 
>> wrote:
>>
>>> Hi Yuval,
>>>
>>> sorry that nobody replied earlier. Somehow your email fell through
>>> the
>>> cracks.
>>>
>>> If I understand you correctly, could would like to implement a table
>>> source that implements both `SupportsWatermarkPushDown` and
>>> `SupportsFilterPushDown`?
>>>
>>> The current behavior might be on purpose. Filters and Watermarks are
>>> not
>>> very compatible. Filtering would also mean that records (from which
>>> watermarks could be generated) are skipped. If the filter is very
>>> strict, we would not generate any new watermarks and the pipeline
>>> would
>>> stop making progress in time.
>>>
>>> Watermark push down is only necessary, if per-partition watermarks
>>> are
>>> required. Otherwise the watermarks are generated in a subsequent
>>> operator after the source. So you can still use rowtime without
>>> implementing `SupportsWatermarkPushDown` in your custom source.
>>>
>>> I will lookp in Shengkai who worked on this topic recently.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>>> > Bumping this up again, would appreciate any help if anyone is
>>> familiar
>>> > with the blink planner.
>>> >
>>> > Thanks,
>>> > Yuval.
>>> >
>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov >> > > wrote:
>>> >
>>> > Hi Jark,
>>> > Would appreciate your help with this.
>>> >
>>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>>> ro...@apache.org
>>> > > wrote:
>>> >
>>> > Hi Yuval,
>>> >
>>> > I'm not familiar with the Blink planner but probably Jark
>>> can help.
>>> >
>>> > Regards,
>>> > Roman
>>> >
>>> >
>>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>>> > mailto:yuva...@gmail.com>> wrote:
>>> >
>>> > Update: When I don't set the watermark explicitly on
>>> the
>>> > TableSchema, `applyWatermarkStrategy` never gets
>>> called on
>>> > my ScanTableSource, which does make sense. 

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-08 Thread Shengkai Fang
Hi, Yuval.

I have opened a ticket about this[1]. But I don't think we have any
solution to solve.

Do you have time to help us to solve this? I think it will take too much
time.

[1] https://issues.apache.org/jira/browse/FLINK-21675

Yuval Itzchakov  于2021年3月8日周一 下午9:18写道:

> Thank you Shenkai,
> That does explain what I'm seeing.
>
> Jark / Shenkai - Is there any workaround to get Flink to work with push
> watermarks and predicate pushdown until this is resolved?
>
> On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang  wrote:
>
>> Hi, Yuval, Jark, Timo.
>>
>> Currently the watermark push down happens in the logical rewrite phase
>> but the filter push down happens in the local phase, which means the
>> planner will first check the Filter push down and then check the watermark
>> push down.
>>
>> I think we need a rule to transpose between the filter and watermark
>> assigner or extend the filter push down rule to capture the structure that
>> the watermark assigner is the parent of the table scan.
>>
>> Best,
>> Shengkai
>>
>> Yuval Itzchakov  于2021年3月8日周一 上午12:13写道:
>>
>>> Hi Jark,
>>>
>>> Even after implementing both, I don't see the watermark being pushed to
>>> the tablesource in the logical plan and avoids predicate pushdown from
>>> running.
>>>
>>> On Sun, Mar 7, 2021, 15:43 Jark Wu  wrote:
>>>
 Hi Yuval,

 That's correct you will always get a LogicalWatermarkAssigner if you
 assigned a watermark.
 If you implement SupportsWatermarkPushdown,
 the LogicalWatermarkAssigner will be pushed
 into TableSource, and then you can push Filter into source if source
 implement SupportsFilterPushdown.

 Best,
 Jark

 On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov  wrote:

> Hi Timo,
> After investigating this further, this is actually non related to
> implementing SupportsWatermarkPushdown.
>
> Once I create a TableSchema for my custom source's RowData, and assign
> it a watermark (see my example in the original mail), the plan will always
> include a LogicalWatermarkAssigner. This assigner that is between the
> LogicalTableScan and the LogicalFilter will then go on and fail the
> HepPlanner from invoking the optimization since it requires
> LogicalTableScan to be a direct child of LogicalFilter. Since I have
> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
> work.
>
> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther 
> wrote:
>
>> Hi Yuval,
>>
>> sorry that nobody replied earlier. Somehow your email fell through
>> the
>> cracks.
>>
>> If I understand you correctly, could would like to implement a table
>> source that implements both `SupportsWatermarkPushDown` and
>> `SupportsFilterPushDown`?
>>
>> The current behavior might be on purpose. Filters and Watermarks are
>> not
>> very compatible. Filtering would also mean that records (from which
>> watermarks could be generated) are skipped. If the filter is very
>> strict, we would not generate any new watermarks and the pipeline
>> would
>> stop making progress in time.
>>
>> Watermark push down is only necessary, if per-partition watermarks
>> are
>> required. Otherwise the watermarks are generated in a subsequent
>> operator after the source. So you can still use rowtime without
>> implementing `SupportsWatermarkPushDown` in your custom source.
>>
>> I will lookp in Shengkai who worked on this topic recently.
>>
>> Regards,
>> Timo
>>
>>
>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>> > Bumping this up again, would appreciate any help if anyone is
>> familiar
>> > with the blink planner.
>> >
>> > Thanks,
>> > Yuval.
>> >
>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov > > > wrote:
>> >
>> > Hi Jark,
>> > Would appreciate your help with this.
>> >
>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>> ro...@apache.org
>> > > wrote:
>> >
>> > Hi Yuval,
>> >
>> > I'm not familiar with the Blink planner but probably Jark
>> can help.
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>> > mailto:yuva...@gmail.com>> wrote:
>> >
>> > Update: When I don't set the watermark explicitly on the
>> > TableSchema, `applyWatermarkStrategy` never gets called
>> on
>> > my ScanTableSource, which does make sense. But now the
>> > question is what should be done? This feels a bit
>> unintuitive.
>> >
>> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>> > mailto:yuva...@gmail.com>> wrote:
>> >
>> > Hi,
>> > 

Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-08 Thread Jacob
谢谢大佬答疑。
我先尝试使用 YarnClusterDescriptor 这些类提交Job。看看后续使用情况 是否合适



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-08 Thread tison
>意思是可以通过相关API,去读一个jar包并提交Job吗?要提交到的集群也是通过配置参数传入代码里,是大
概这样的一个过程吗?有相关的文档或者demo吗?我在网上一直找不到相关内容。

是的,目前公开的 API 是命令行,内部是 ClusterDescriptor、CliFrontend
等一系列类在驱动。定制的时候通常直接根据内部类来编程,但是它们不是公开接口,可能随时会改变。目前没有更好的办法。

>如果和自己系统集成的话,是把这些页面以超链接的形式集成到系统里面吗,在系统dashboard中点某个按钮,跳转到flink webui的某一个模块里?

这个集成有很多种办法了,包括你页面嵌套页面,或者页面跳转页面,或者直接二开 Flink Web 模块,或者在完全自主开发的页面里调用 REST
API,等等。

Best,
tison.


Jacob <17691150...@163.com> 于2021年3月9日周二 上午9:42写道:

> 谢谢提供思路,刚通过接口编程这个思路找到了一些文章和demo。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-08 Thread Jacob
谢谢提供思路,刚通过接口编程这个思路找到了一些文章和demo。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:【flink sql-client 读写 Kerberos认证的hive】

2021-03-08 Thread guoyb
好的,谢谢!



---原始邮件---
发件人: "Rui Li"https://issues.apache.org/jira/browse/FLINK-20913
有关了,这个issue是1.12.2修复的,可以升级一下试试。

On Mon, Mar 8, 2021 at 2:15 PM guoyb <861277...@qq.com wrote:

 您好!
 hive.metastore.sasl.enabled 是true


 启动sql client的时候,可以正常读取到认证信息,并读取metastore的表名。


 读和写,认证就失败了。



 ---原始邮件---
 发件人: "Rui Li"

什么原因导致 Could not connect to BlobServer ?

2021-03-08 Thread macdoor
运行 1.12.2 standalone 集群,不定期会出现类似这种错误,请问这有可能是什么原因导致的?谢谢!

Caused by: java.io.IOException: Failed to fetch BLOB
fb90d0fce9ff3ad8353ea97e46f9c913/p-bc0d39187ed200f9df64f90463534862858961a2-2ff77a5adb95af29376c6699173c3969
from hb3-dev-gem-svc1-000/10.30.69.13:43003 and store it under
/home/gum/flink_tmp/blobStore-e82a4a09-0f9c-4846-902c-b18c6fd09dae/incoming/temp-1153
at
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:167)
at
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:166)
at
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:187)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.createUserCodeClassLoader(BlobLibraryCacheManager.java:251)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:228)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:199)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:333)
at
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:983)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:632)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not connect to BlobServer at address
hb3-dev-gem-svc1-000/10.30.69.13:43003
at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:102)
at
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:137)
... 10 more
Caused by: java.net.UnknownHostException: hb3-dev-gem-svc1-000
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:96)
... 11 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:pyflink 如何使用session window对相同pv数据聚合

2021-03-08 Thread kk
我之前测试过slide window,可以使用。就是无法在session window中使用,group windowed table不支持。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Trigger and completed Checkpointing do not appeared

2021-03-08 Thread Alexey Trenikhun
The picture in first e-mail shows that job was completed in 93ms


From: Abdullah bin Omar 
Sent: Monday, March 8, 2021 3:53 PM
To: user@flink.apache.org 
Subject: Re: Trigger and completed Checkpointing do not appeared

Hi,

Please read the previous email (and also this email) to answer me.

Here in the attached pic, the interval showed 1s. and the job is finished 1939ms

According to the code in the previous email, at least there should be some 
checkpoint triggered and completed.

However, in the apache flink UI, it showed no trigger and completed checkpoint 
(according to the attached pic in the first email)

What is the problem? Why does the completed checkpointing not work in here?

Thank you




On Mon, Mar 8, 2021 at 3:07 PM Abdullah bin Omar 
mailto:abdullahbinoma...@gmail.com>> wrote:
Hi,

I run a sample code for word count. The input is just some text, and it 
contains output. In the output, it counts the words. Then in the code, I put 
all necessary lines to enable the checkpoint. However, I did not see any 
triggered or completed checkpoints (in the attached pic). But the word count is 
still working.

The code is below:


package org.apache.flink.flink_quickstart_java;



import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.MultipleParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

//import org.apache.flink.streaming.examples.wordcount.util.WordCountData;

import org.apache.flink.util.Collector;

import org.apache.flink.util.Preconditions;


import org.apache.flink.streaming.api.CheckpointingMode;

import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;



public class StreamingJob {




// *

// PROGRAM

// *


public static void main(String[] args) throws Exception {


// Checking input parameters

final MultipleParameterTool params = 
MultipleParameterTool.fromArgs(args);


// set up the execution environment

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



 // start a checkpoint every 1000 ms

 env.enableCheckpointing(1000);



 // to set minimum progress time to happen between checkpoints

 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);



 // checkpoints have to complete within 1 ms, or are discarded

 env.getCheckpointConfig().setCheckpointTimeout(1);



 // set mode to exactly-once (this is the default)

 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
 // AT_LEAST_ONCE



 // allow only one checkpoint to be in progress at the same time

 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


 // enable externalized checkpoints which are retained after job 
cancellation

 
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  // DELETE_ON_CANCELLATION



 //StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

 env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 ));

// number of restart 
attempts , delay in each restart



// make parameters available in the web interface

env.getConfig().setGlobalJobParameters(params);


// get input data

DataStream text = null;

if (params.has("input")) {

// union all the inputs from text files

for (String input : params.getMultiParameterRequired("input")) {

if (text == null) {

text = env.readTextFile(input);

} else {

text = text.union(env.readTextFile(input));

}

}

Preconditions.checkNotNull(text, "Input DataStream should not be 
null.");

} else {

System.out.println("Executing WordCount example with default input 
data set.");

System.out.println("Use --input to specify file input.");

// get default test text data

//text = env.fromElements(WordCountData.WORDS);

}


DataStream> counts =

// split up the lines in pairs (2-tuples) containing: (word,1)

text.flatMap(new Tokenizer())

// group by the tuple field "0" and sum up tuple field 
"1"

.keyBy(value -> value.f0)

.sum(1);


// emit result

if (params.has("output")) {

Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-08 Thread Jacob
谢谢回复!

这两天有事回复晚了抱歉。

我flink job是运行在hadoop集群的,即On Yarn模式。
根据您所说的 


1.[通过 FLINK 和 YARN 或 k8s
的接口进行编程,管理元数据,管理用户文件,支持提交作业及之后管理作业状态],意思是可以通过相关API,去读一个jar包并提交Job吗?要提交到的集群也是通过配置参数传入代码里,是大概这样的一个过程吗?有相关的文档或者demo吗?我在网上一直找不到相关内容。


2. [Flink 本身有一个 Web
前端,可以支持你要的大部分功能],这个我清楚,也经常打开这个webUI查看日志,那如果和自己系统集成的话,是把这些页面以超链接的形式集成到系统里面吗,在系统dashboard中点某个按钮,跳转到flink
webui的某一个模块里?



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Call for Presentations for ApacheCon 2021 now open

2021-03-08 Thread Rich Bowen
[Note: You are receiving this because you are subscribed to a users@ 
list on one or more Apache Software Foundation projects.]


The ApacheCon Planners and the Apache Software Foundation are pleased to 
announce that ApacheCon@Home will be held online, September 21-23, 2021. 
Once again, we’ll be featuring content from dozens of our projects, as 
well as content about our community, how Apache works, business models 
around Apache software, the legal aspects of open source, and many other 
topics.


Last year’s virtual ApacheCon@Home event was a big success, with 5,745 
registrants from more than 150 countries, spanning every time zone, with 
the virtual format delivering content to attendees who would never have 
attended an in-person ApacheCon (83% of post-event poll responders in 
2020 indicated this was their first ApacheCon ever)!


Given the great participation and excitement for last year’s event, we 
are announcing the Call for Presentations is now open to presenters from 
around the world until May 1st. Talks can be focused on the topics 
above, as well as any of our amazing projects. Submit your talks today!


https://www.apachecon.com/acah2021/cfp.html

We look forward to reviewing your contribution to one of the most 
popular open source software events in the world!



Rich, for the ApacheCon Planners

--
Rich Bowen, VP Conferences
The Apache Software Foundation
https://apachecon.com/
@apachecon


Re: Stateful functions 2.2 and stop with savepoint

2021-03-08 Thread Meissner, Dylan
Thank you for this information, Piotr.

The comment from Igal Shilman in FLINK-18894 issue says, "Obtaining a MAX_PRIO 
mailbox from StreamTask, solves this issue." I'm unclear what this means -- is 
this a workaround I can leverage?

Dylan

From: Piotr Nowojski 
Sent: Thursday, March 4, 2021 7:03 AM
To: Kezhu Wang 
Cc: Meissner, Dylan ; user@flink.apache.org 

Subject: Re: Stateful functions 2.2 and stop with savepoint

It doesn't change much ;) There is a known issue of stopping with savepoint and 
stateful functions not working [1]. The difference is that this one we will 
probably want to tackle sooner or later. Old streaming iterations are probably 
dead..

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18894

czw., 4 mar 2021 o 15:56 Kezhu Wang mailto:kez...@gmail.com>> 
napisał(a):
Hi all,

My BAD!!!

Sorry for apparent mess up in that moment.

I will write a separate test for stream iterations.


The stateful function part should be a separated issue.


Best,
Kezhu Wang



On March 4, 2021 at 22:13:48, Piotr Nowojski 
(piotr.nowoj...@gmail.com) wrote:

Hi Meissner,

Can you clarify, are you talking about stateful functions? [1] Or the stream 
iterations [2]? The first e-mail suggests stateful functions, but the ticket 
that Kezhu created is talking about the latter.

Piotrek

[1] https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#iterations



niedz., 28 lut 2021 o 15:33 Kezhu Wang 
mailto:kez...@gmail.com>> napisał(a):
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in 
deprecation. Comparing to take-savepoints and then cancel approach, there will 
be no checkpoints in between. This may be important if there are two phase 
commit operators in your job.


Best,
Kezhu Wang



On February 28, 2021 at 20:50:29, Meissner, Dylan 
(dylan.t.meiss...@nordstrom.com) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two 
separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang mailto:kez...@gmail.com>>
Sent: Sunday, February 28, 2021 12:31 AM
To: user@flink.apache.org 
mailto:user@flink.apache.org>>; Meissner, Dylan 
mailto:dylan.t.meiss...@nordstrom.com>>
Subject: Re: Stateful functions 2.2 and stop with savepoint

Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 
for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang


On February 28, 2021 at 00:59:04, Meissner, Dylan 
(dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as 
this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to 
use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, 
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain 
", the operation never completes, reporting IN_PROGRESS until I hit the 
"failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: 
Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their 
work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | 
end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | 
end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Rainie Li
Thanks for the info, David.
The job has checkpointing.
I saw some tasks failed due to
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka"
Here is stacktrack from JM log:

container_e17_1611597945897_8007_01_000240 @ worker-node-host
(dataPort=42321).
2021-02-10 01:19:27,206 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason:
Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38:
116447 ms has passed since last append
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
... 18 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42
record(s) for frontend_event_core-38: 116447 ms has passed since last append
2021-02-10 01:19:27,216 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
(7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
at
org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at

Flink Read S3 Intellij IDEA Error

2021-03-08 Thread sri hari kali charan Tummala
> Hi Flink Experts,
>

I am trying to read an S3 file from my Intellij using Flink I am.comimg
> across Aws Auth error can someone help below are all the details.
>


> I have Aws credentials in homefolder/.aws/credentials
>

My Intellij Environment Variables:-
> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>
> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>
> flink-conf.yaml file content:-
>
> fs.hdfs.hadoopconf: 
> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>
> core-site.xml file content:-
>
> 
> 
>
> 
> 
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
> 
>
> 
> fs.s3.buffer.dir
> /tmp
> 
>
> 
> fs.s3a.server-side-encryption-algorithm
> AES256
> 
>
> 
>
> 
> fs.s3a.aws.credentials.provider
> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
> 
> 
> fs.s3a.access.key
> 
> 
> 
> fs.s3a.secret.key
> 
> 
> 
> fs.s3a.session.token
> 
> 
>
> 
> fs.s3a.proxy.host
> 
> 
> 
> fs.s3a.proxy.port
> 8099
> 
> 
> fs.s3a.proxy.username
> 
> 
> 
> fs.s3a.proxy.password
> 
> 
>
> 
>
> POM.xml file:-
>
> 
> http://maven.apache.org/POM/4.0.0;
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
>
> FlinkStreamAndSql
> FlinkStreamAndSql
> 1.0-SNAPSHOT
> 
> src/main/scala
> 
> 
> 
> net.alchim31.maven
> scala-maven-plugin
> 3.1.3
> 
> 
> 
> compile
> testCompile
> 
> 
> 
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-surefire-plugin
> 2.13
> 
> false
> true
> 
> 
> 
> **/*Test.*
> **/*Suite.*
> 
> 
> 
>
> 
> 
> maven-assembly-plugin
> 2.4.1
> 
> 
> jar-with-dependencies
> 
> 
> 
> 
> make-assembly
> package
> 
> single
> 
> 
> 
> 
> 
> 
> 
>
> 
> org.apache.flink
> flink-core
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-core
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-clients_2.11
> 1.8.1
> 
>
> 
> org.apache.derby
> derby
> 10.13.1.1
> 
>
> 
> org.apache.flink
> flink-jdbc_2.11
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-table-api-scala_2.11
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-table-api-java
> 1.8.1
> 
>
>
> 
> org.apache.flink
> flink-table
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-table-planner_2.11
> 1.8.1
> 
>
>
> 
> org.apache.flink
> flink-json
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-scala_2.11
> 1.8.1
> 
>
>
>org.apache.flink
>flink-scala_2.11
>1.8.1
>
>
>
>org.apache.flink
>flink-streaming-scala_2.11
>1.8.1
>
>
>
>org.apache.flink
>flink-connector-kinesis_2.11
>1.8.0
>system
>
> ${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar
>
>
>
>org.apache.flink
>flink-connector-kafka-0.11_2.11
>1.8.1
>
>
>
>com.amazonaws
>amazon-kinesis-client
>1.8.8
>
>
>

Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread David Anderson
Rainie,

A restart after a failure can cause data loss if you aren't using
checkpointing, or if you experience a transaction timeout.

A manual restart can also lead to data loss, depending on how you manage
the offsets, transactions, and other state during the restart. What
happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li  wrote:

> Thanks Yun and David.
> There were some tasks that got restarted. We configured the restart policy
> and the job didn't fail.
> Will task restart cause data loss?
>
> Thanks
> Rainie
>
>
> On Mon, Mar 8, 2021 at 10:42 AM David Anderson 
> wrote:
>
>> Rainie,
>>
>> Were there any failures/restarts, or is this discrepancy observed without
>> any disruption to the processing?
>>
>> Regards,
>> David
>>
>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li  wrote:
>>
>>> Thanks for the quick response, Smile.
>>> I don't use window operators or flatmap.
>>> Here is the core logic of my filter, it only iterates on filters list.
>>> Will *rebalance() *cause it?
>>>
>>> Thanks again.
>>> Best regards
>>> Rainie
>>>
>>> SingleOutputStreamOperator> 
>>> matchedRecordsStream =
>>> eventStream
>>> .rebalance()
>>> .process(new ProcessFunction>() {
>>>   public void processElement(
>>>   T element,
>>>   ProcessFunction>.Context 
>>> context,
>>>   Collector> collector) {
>>> for (StreamFilter filter : filters) {
>>>   if (filter.match(element)) {
>>> SubstreamConfig substreamConfig = 
>>> filter.getSubstreamConfig();
>>> SplitterIntermediateRecord result = new 
>>> SplitterIntermediateRecord<>(
>>> substreamConfig.getKafkaCluster(),
>>> substreamConfig.getKafkaTopic(),
>>> substreamConfig.getCutoverKafkaTopic(),
>>> substreamConfig.getCutoverTimestampInMs(),
>>> element);
>>> collector.collect(result);
>>>   }
>>> }
>>>   }
>>> })
>>> .name("Process-" + eventClass.getSimpleName());
>>>
>>>
>>> On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:
>>>
 Hi Rainie,

 Could you please provide more information about your processing logic?
 Do you use window operators?
 If there's no time-based operator in your logic, late arrival data
 won't be
 dropped by default and there might be something wrong with your flat
 map or
 filter operator. Otherwise, you can use sideOutputLateData() to get the
 late
 data of the window and have a look at them. See [1] for more information
 about sideOutputLateData().

 [1].

 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

 Regards,
 Smile



 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-08 Thread Kevin Lam
Awesome, thanks Shuiqiang! I was able to get an example running by
referencing your configs.

On Sat, Mar 6, 2021 at 7:12 AM Shuiqiang Chen  wrote:

> Hi Kevin,
>
> For your information, bellow is an example for running a PyFlink table API
> WordCount job.
>
> 1. Building a Docker image with Python and PyFlink Installed:
>
> Dockerfile:
>
> FROM flink:1.12.0
>
>
> # install python3 and pip3
> RUN apt-get update -y && \
> apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf
> /var/lib/apt/lists/*
> RUN ln -s /usr/bin/python3 /usr/bin/python
>
> # install Python Flink
>
> RUN pip3 install apache-flink==1.12.0
>
> 2. Resource definitions:
>
> Flink-configuration-configmap.yaml:
>
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config
>   labels:
> app: flink
> data:
>   flink-conf.yaml: |+
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 2
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> queryable-state.proxy.ports: 6125
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size: 1728m
> parallelism.default: 2
>   log4j-console.properties: |+
> # This affects logging for both user code and Flink
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
>
> # Uncomment this if you want to _only_ change Flink's logging
> #logger.flink.name = org.apache.flink
> #logger.flink.level = INFO
>
> # The following lines keep the log level of common
> libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> logger.akka.name = akka
> logger.akka.level = INFO
> logger.kafka.name= org.apache.kafka
> logger.kafka.level = INFO
> logger.hadoop.name = org.apache.hadoop
> logger.hadoop.level = INFO
> logger.zookeeper.name = org.apache.zookeeper
> logger.zookeeper.level = INFO
>
> # Log all infos to the console
> appender.console.name = ConsoleAppender
> appender.console.type = CONSOLE
> appender.console.layout.type = PatternLayout
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
>
> # Log all infos in the given rolling file
> appender.rolling.name = RollingFileAppender
> appender.rolling.type = RollingFile
> appender.rolling.append = false
> appender.rolling.fileName = ${sys:log.file}
> appender.rolling.filePattern = ${sys:log.file}.%i
> appender.rolling.layout.type = PatternLayout
> appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
> appender.rolling.policies.type = Policies
> appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
> appender.rolling.policies.size.size=100MB
> appender.rolling.strategy.type = DefaultRolloverStrategy
> appender.rolling.strategy.max = 10
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel
> handler
> logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
> logger.netty.level = OFF
>
> Job-manager-service.yaml:
>
> apiVersion: v1
> kind: Service
> metadata:
>   name: flink-jobmanager
> spec:
>   type: ClusterIP
>   ports:
>   - name: rpc
> port: 6123
>   - name: blob-server
> port: 6124
>   - name: webui
> port: 8081
>   selector:
> app: flink
> component: jobmanager
>
> Job-manager.yaml
>
> apiVersion: batch/v1
> kind: Job
> metadata:
>   name: flink-jobmanager
> spec:
>   template:
> metadata:
>   labels:
> app: flink
> component: jobmanager
> spec:
>   restartPolicy: OnFailure
>   containers:
> - name: jobmanager
>   image: pyflink:v1
>   env:
>   args: ["standalone-job", "-py",
> "/opt/flink/examples/python/table/batch/word_count.py"]
>   ports:
> - containerPort: 6123
>   name: rpc
> - containerPort: 6124
>   name: blob-server
> - containerPort: 8081
>   name: webui
>   livenessProbe:
> tcpSocket:
>   port: 6123
> initialDelaySeconds: 30
> periodSeconds: 60
>   volumeMounts:
> - name: flink-config-volume
>   mountPath: /opt/flink/conf
>   securityContext:
> runAsUser:   # refers to user _flink_ from official flink
> image, change if necessary
>   volumes:
> - name: flink-config-volume
>   configMap:
> name: flink-config
> items:
>   - key: flink-conf.yaml
> path: flink-conf.yaml
>   - key: log4j-console.properties
> path: log4j-console.properties
>
> Task-manager.yaml
>
> apiVersion: apps/v1
> kind: Deployment
> metadata:

Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Rainie Li
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy
and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson  wrote:

> Rainie,
>
> Were there any failures/restarts, or is this discrepancy observed without
> any disruption to the processing?
>
> Regards,
> David
>
> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li  wrote:
>
>> Thanks for the quick response, Smile.
>> I don't use window operators or flatmap.
>> Here is the core logic of my filter, it only iterates on filters list.
>> Will *rebalance() *cause it?
>>
>> Thanks again.
>> Best regards
>> Rainie
>>
>> SingleOutputStreamOperator> 
>> matchedRecordsStream =
>> eventStream
>> .rebalance()
>> .process(new ProcessFunction>() {
>>   public void processElement(
>>   T element,
>>   ProcessFunction>.Context 
>> context,
>>   Collector> collector) {
>> for (StreamFilter filter : filters) {
>>   if (filter.match(element)) {
>> SubstreamConfig substreamConfig = 
>> filter.getSubstreamConfig();
>> SplitterIntermediateRecord result = new 
>> SplitterIntermediateRecord<>(
>> substreamConfig.getKafkaCluster(),
>> substreamConfig.getKafkaTopic(),
>> substreamConfig.getCutoverKafkaTopic(),
>> substreamConfig.getCutoverTimestampInMs(),
>> element);
>> collector.collect(result);
>>   }
>> }
>>   }
>> })
>> .name("Process-" + eventClass.getSimpleName());
>>
>>
>> On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:
>>
>>> Hi Rainie,
>>>
>>> Could you please provide more information about your processing logic?
>>> Do you use window operators?
>>> If there's no time-based operator in your logic, late arrival data won't
>>> be
>>> dropped by default and there might be something wrong with your flat map
>>> or
>>> filter operator. Otherwise, you can use sideOutputLateData() to get the
>>> late
>>> data of the window and have a look at them. See [1] for more information
>>> about sideOutputLateData().
>>>
>>> [1].
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> Regards,
>>> Smile
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread David Anderson
Rainie,

Were there any failures/restarts, or is this discrepancy observed without
any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li  wrote:

> Thanks for the quick response, Smile.
> I don't use window operators or flatmap.
> Here is the core logic of my filter, it only iterates on filters list.
> Will *rebalance() *cause it?
>
> Thanks again.
> Best regards
> Rainie
>
> SingleOutputStreamOperator> 
> matchedRecordsStream =
> eventStream
> .rebalance()
> .process(new ProcessFunction>() {
>   public void processElement(
>   T element,
>   ProcessFunction>.Context 
> context,
>   Collector> collector) {
> for (StreamFilter filter : filters) {
>   if (filter.match(element)) {
> SubstreamConfig substreamConfig = filter.getSubstreamConfig();
> SplitterIntermediateRecord result = new 
> SplitterIntermediateRecord<>(
> substreamConfig.getKafkaCluster(),
> substreamConfig.getKafkaTopic(),
> substreamConfig.getCutoverKafkaTopic(),
> substreamConfig.getCutoverTimestampInMs(),
> element);
> collector.collect(result);
>   }
> }
>   }
> })
> .name("Process-" + eventClass.getSimpleName());
>
>
> On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:
>
>> Hi Rainie,
>>
>> Could you please provide more information about your processing logic?
>> Do you use window operators?
>> If there's no time-based operator in your logic, late arrival data won't
>> be
>> dropped by default and there might be something wrong with your flat map
>> or
>> filter operator. Otherwise, you can use sideOutputLateData() to get the
>> late
>> data of the window and have a look at them. See [1] for more information
>> about sideOutputLateData().
>>
>> [1].
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> Regards,
>> Smile
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


idleTimeMsPerSecond on Flink 1.9?

2021-03-08 Thread Lakshmi Gururaja Rao
Hi

I'm trying to understand the implementation of idleTimeMsPerSecond.
Specifically what I'm trying to do is, adapt this metric to be used with
Flink 1.9 (for a fork).

I tried an approach similar to this PR
 and measuring the time
to request a new buffer is easy to adapt but I've found that there's a
difference in the way the mailbox loop runs in 1.9 vs. 1.12.1 and I end up
with under-reported values (for example a sink getting no data, reports
idleTimeMsPerSecond as 0 always which doesn't seem right).

I see that the threading model in StreamTask has changed significantly
after 1.9. Specifically, I think in Flink 1.9 there's no blocking Mailbox
loop as in the later versions (example
)
which is where the idle time is measured..

Maybe I'm missing something, but I guess I can't directly use the same
approach to measure idle time in 1.9? If so, I guess an alternative (more
expensive) approach may be to measure it when the task thread processes
records (like somewhere in this block
)
but I'm not sure if that would be the right/efficient thing to do..

Any suggestions on how to accurately measure task idle time in Flink 1.9?

--
Lakshmi


Re: Gradually increasing checkpoint size

2021-03-08 Thread Dan Hill
Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500
rows.  I also have my own operator that only outputs the first value.

public class OnlyFirstUser extends
RichFlatMapFunction {


private transient ValueState alreadyOutputted;


@Override

public void flatMap(T value, Collector out) throws Exception {

if (!alreadyOutputted.value()) {

alreadyOutputted.update(true);

out.collect(value);

}

}


@Override

public void open(Configuration config) {

ValueStateDescriptor descriptor =

new ValueStateDescriptor<>(

"alreadyOutputted", // the state name

TypeInformation.of(new TypeHint() {}), //
type information

false); // default value of the state, if nothing
was set

alreadyOutputted = getRuntimeContext().getState(descriptor);

}

}

All of my inputs have this watermark strategy.  In the Flink UI, early in
the job run, I see "Low Watermarks" on each node and they increase.  After
some checkpoint failures, low watermarks stop appearing in the UI

.


.assignTimestampsAndWatermarks(


WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));



Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao  wrote:

> Hi Dan,
>
> Have you use a too large upperBound or lowerBound?
>
> If not, could you also check the watermark strategy ?
> The interval join operator depends on the event-time
> timer for cleanup, and the event-time timer would be
> triggered via watermark.
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Dan Hill 
> *Send Date:*Mon Mar 8 14:59:48 2021
> *Recipients:*user 
> *Subject:*Gradually increasing checkpoint size
>
>> Hi!
>>
>> I'm running a backfill Flink stream job over older data.  It has multiple
>> interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd
>> expect my checkpoints to stabilize and not grow.
>>
>> Is there a setting to prune useless data from the checkpoint?  My top
>> guess is that my checkpoint has a bunch of useless state in it.
>>
>> - Dan
>>
>


failure checkpoint counts

2021-03-08 Thread Abdullah bin Omar
Hi,

I faced this exception at the time of checkpoint counts. Could you please
inform me what the problem is here?

the exception:

org.apache.flink.runtime.JobException: Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3,
backoffTimeMS=100)

at org.apache.flink.runtime.executiongraph.failover.flip1.
ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:130)

at org.apache.flink.runtime.executiongraph.failover.flip1.
ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler
.java:81)

at org.apache.flink.runtime.scheduler.DefaultScheduler
.handleTaskFailure(DefaultScheduler.java:221)

at org.apache.flink.runtime.scheduler.DefaultScheduler
.maybeHandleTaskFailure(DefaultScheduler.java:212)

at org.apache.flink.runtime.scheduler.DefaultScheduler
.updateTaskExecutionStateInternal(DefaultScheduler.java:203)

at org.apache.flink.runtime.scheduler.SchedulerBase
.updateTaskExecutionState(SchedulerBase.java:696)

at org.apache.flink.runtime.scheduler.SchedulerNG
.updateTaskExecutionState(SchedulerNG.java:80)

at org.apache.flink.runtime.jobmaster.JobMaster
.updateTaskExecutionState(JobMaster.java:433)

at jdk.internal.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)

at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:564)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
AkkaRpcActor.java:305)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:212)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:77)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107)

Caused by: java.net.ConnectException: Connection refused

at java.base/sun.nio.ch.Net.connect0(Native Method)

at java.base/sun.nio.ch.Net.connect(Net.java:574)

at java.base/sun.nio.ch.Net.connect(Net.java:563)

at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:588)

at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:333)

at java.base/java.net.Socket.connect(Socket.java:648)

at org.apache.flink.streaming.api.functions.source.
SocketTextStreamFunction.run(SocketTextStreamFunction.java:104)

at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:110)

at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:66)

at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)



Thank you!


Re: New settings are not honored unless checkpoint is cleared.

2021-03-08 Thread Yun Gao
Hi Yordan,

What are the settings that are changed during the tests?

Best,
Yun


--
From:Yordan Pavlov 
Send Time:2021 Mar. 5 (Fri.) 23:36
To:user 
Subject:New settings are not honored unless checkpoint is cleared.

Hello there,
I am running Flink 1.11.3 on Kubernetes deployment. If I change a
setting and re-deploy my Flink setup, the new setting is correctly
applied in the config file but is not being honored by Flink. In other
words, I can ssh into the pod and check the config file - it has the
new setting as I would expect. However the web interface for the job
keeps showing the old configuration and Flink as a whole keep running
with the old setting. The way to have the new setting considered is to
clear the checkpoint for the job stored in Zookeeper. Then I recover
the job using:

--fromSavepoint path_to_savepoint_or_checkpoint

My presumption is that the job configuration is stored in Zookeeper
along with other Flink data. Could someone shed some light on what I
am observing.

Thank you!

Re: questions about broadcasts

2021-03-08 Thread Yun Gao
Hi Marco,

(a) It is possible for an operator to receive two different kind of broadcasts, 

DataStream ints = 
DataStream strs = ...
ints.broadcast().connect(strs.broadcast())
​.process(new CoProcessFunction(){...});

(b) Traditional Flink operator could not accept three different inputs. 
There is a new MultipleInputOperator that could accept arbitrary number
of inputs [1]. However It is currently not expose directly to end users,
and you would need to work on some low-level api to use it. Or an alternative
might be use a tag to union the two input streams (or any two of the three 
inputs)
and use the (keyed)CoProcessFunction above. Also note that the broadcast is 
only a partitioner, 
and it is treated no difference with other partitioners for downstream 
operators.

Best,
Yun



[1] 
https://github.com/apache/flink/blob/51524de8fd337aafd30952873b36216c5a3c43bc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java#L261


--
Sender:Marco Villalobos
Date:2021/03/06 09:47:53
Recipient:user
Theme:questions about broadcasts

Is it possible for an operator to receive two different kinds of broadcasts?  

Is it possible for an operator to receive two different types of streams and a 
broadcast? For example, I know there is a KeyedCoProcessFunction, but is there 
a version of that which can also receive broadcasts? 


Re: Re: Re: Checkpoint Error

2021-03-08 Thread Yun Gao
Hi Navneeth,

Is the attached exception the root cause for the checkpoint failure ?
Namely is it also reported in job manager log?

Also, have you enabled concurrent checkpoint? 

Best,
 Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Mon Mar 8 13:10:46 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Checkpoint Error

Hi Yun,

Thanks for the response. I checked the mounts and only the JM's and TM's are 
mounted with this EFS. Not sure how to debug this.

Thanks
On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:

Hi Navneeth,

It seems from the stack that the exception is caused by the underlying EFS 
problems ? Have you checked
if there are errors reported for EFS, or if there might be duplicate mounting 
for the same EFS and others
have ever deleted the directory?

Best,
Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Sun Mar 7 15:44:59 2021
Recipients:user 
Subject:Re: Checkpoint Error

Hi All,

Any suggestions?

Thanks
On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan  
wrote:

Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the below 
error. Not sure what's causing it, any pointers would help. We have 10 TM's 
checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 
for operator Processor -> Sink: KafkaSink (34/42).}at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
 java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not 
materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: 
Could not flush and close the file system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
 5 moreCaused by: java.io.IOException: Could not flush and close the file 
system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 
moreCaused by: java.io.IOException: Stale file handleat 
java.io.FileOutputStream.close0(Native Method)at 
java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
java.io.FileOutputStream.close(FileOutputStream.java:354)at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
 12 more

Thanks

Re: Gradually increasing checkpoint size

2021-03-08 Thread Yun Gao
Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun



 --Original Mail --
Sender:Dan Hill 
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user 
Subject:Gradually increasing checkpoint size

Hi!

I'm running a backfill Flink stream job over older data.  It has multiple 
interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd 
expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is 
that my checkpoint has a bunch of useless state in it.

- Dan

Missing support for `TestStreamEnvironment#executeAsync`

2021-03-08 Thread Bob Tiernay
Hi all,

I have been trying to test a Flink 1.11 streaming job using the
`DataStreamUtils#collect` utility against a `MiniCluster` based test.
However, I noticed an issue when doing so.

`TestStreamEnvironment` does not implement `executeAsync`. Thus
when `DataStreamUtils#collect` is called, it invokes
`env.executeAsync("Data Stream Collect");` which will instead use
`StreamExecutionEnvironment#executeAsync`'s implementation. This is
problematic since it will create a brand new `MiniCluster` when the
following lines are hit:

CompletableFuture jobClientFuture = executorFactory
   .getExecutor(configuration)
   .execute(streamGraph, configuration);


Any configurations that were applied during the test won't be respected. Is
this expected behavior?

Thanks in advance,

Bob


Re: Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Yun Gao
Hi Rainie,

From the code it seems the current problem does not use the time-related 
functionality like
window/timer? If so, the problem would be indepdent with the time type used.

Also, it would not likely due to rebalance() since the network layer has the 
check of sequence
number. If there are missed record there would be failover. 

Since the current logic seems not rely on too much complex functionality, would 
it be possible
that there might be some inconsistency between the flink implementation and the 
presto one ?

Best,
Yun


--
Sender:Rainie Li
Date:2021/03/08 17:14:30
Recipient:Smile
Cc:user
Theme:Re: Flink application has slightly data loss using Processing Time

Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will 
rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction>() {
  public void processElement(
  T element,
  ProcessFunction>.Context context,
  Collector> collector) {
for (StreamFilter filter : filters) {
  if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord result = new 
SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
  }
}
  }
})
.name("Process-" + eventClass.getSimpleName());
On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:
Hi Rainie, 

 Could you please provide more information about your processing logic?
 Do you use window operators?
 If there's no time-based operator in your logic, late arrival data won't be
 dropped by default and there might be something wrong with your flat map or
 filter operator. Otherwise, you can use sideOutputLateData() to get the late
 data of the window and have a look at them. See [1] for more information
 about sideOutputLateData().

 [1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
 

 Regards,
 Smile



 --
 Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



KafkaSource Problem

2021-03-08 Thread Bobby Richard
I'm receiving the following exception when trying to use a KafkaSource from
the new DataSource API.

Exception in thread "main" java.lang.NullPointerException
at
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)

Here is my code (kotlin)

val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource {
val builder = KafkaSource.builder()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}




I'm setting the deserializer using the ValueDeserializerWrapper as
described in the KafkaSourceBuilder javadoc example
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html

Looking at the code for the ValueDeserializerWrapper, it appears that the
deserializer isn't actually set until the deserialize method is called, but
getProducedType is actually called first resulting in the
NullPointerException. What am I missing?

Thanks,
Bobby

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-08 Thread Yuval Itzchakov
Thank you Shenkai,
That does explain what I'm seeing.

Jark / Shenkai - Is there any workaround to get Flink to work with push
watermarks and predicate pushdown until this is resolved?

On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang  wrote:

> Hi, Yuval, Jark, Timo.
>
> Currently the watermark push down happens in the logical rewrite phase but
> the filter push down happens in the local phase, which means the planner
> will first check the Filter push down and then check the watermark push
> down.
>
> I think we need a rule to transpose between the filter and watermark
> assigner or extend the filter push down rule to capture the structure that
> the watermark assigner is the parent of the table scan.
>
> Best,
> Shengkai
>
> Yuval Itzchakov  于2021年3月8日周一 上午12:13写道:
>
>> Hi Jark,
>>
>> Even after implementing both, I don't see the watermark being pushed to
>> the tablesource in the logical plan and avoids predicate pushdown from
>> running.
>>
>> On Sun, Mar 7, 2021, 15:43 Jark Wu  wrote:
>>
>>> Hi Yuval,
>>>
>>> That's correct you will always get a LogicalWatermarkAssigner if you
>>> assigned a watermark.
>>> If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner
>>> will be pushed
>>> into TableSource, and then you can push Filter into source if source
>>> implement SupportsFilterPushdown.
>>>
>>> Best,
>>> Jark
>>>
>>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov  wrote:
>>>
 Hi Timo,
 After investigating this further, this is actually non related to
 implementing SupportsWatermarkPushdown.

 Once I create a TableSchema for my custom source's RowData, and assign
 it a watermark (see my example in the original mail), the plan will always
 include a LogicalWatermarkAssigner. This assigner that is between the
 LogicalTableScan and the LogicalFilter will then go on and fail the
 HepPlanner from invoking the optimization since it requires
 LogicalTableScan to be a direct child of LogicalFilter. Since I have
 LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
 work.

 On Fri, Mar 5, 2021 at 5:59 PM Timo Walther  wrote:

> Hi Yuval,
>
> sorry that nobody replied earlier. Somehow your email fell through the
> cracks.
>
> If I understand you correctly, could would like to implement a table
> source that implements both `SupportsWatermarkPushDown` and
> `SupportsFilterPushDown`?
>
> The current behavior might be on purpose. Filters and Watermarks are
> not
> very compatible. Filtering would also mean that records (from which
> watermarks could be generated) are skipped. If the filter is very
> strict, we would not generate any new watermarks and the pipeline
> would
> stop making progress in time.
>
> Watermark push down is only necessary, if per-partition watermarks are
> required. Otherwise the watermarks are generated in a subsequent
> operator after the source. So you can still use rowtime without
> implementing `SupportsWatermarkPushDown` in your custom source.
>
> I will lookp in Shengkai who worked on this topic recently.
>
> Regards,
> Timo
>
>
> On 04.03.21 18:52, Yuval Itzchakov wrote:
> > Bumping this up again, would appreciate any help if anyone is
> familiar
> > with the blink planner.
> >
> > Thanks,
> > Yuval.
> >
> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov  > > wrote:
> >
> > Hi Jark,
> > Would appreciate your help with this.
> >
> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
> ro...@apache.org
> > > wrote:
> >
> > Hi Yuval,
> >
> > I'm not familiar with the Blink planner but probably Jark
> can help.
> >
> > Regards,
> > Roman
> >
> >
> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
> > mailto:yuva...@gmail.com>> wrote:
> >
> > Update: When I don't set the watermark explicitly on the
> > TableSchema, `applyWatermarkStrategy` never gets called
> on
> > my ScanTableSource, which does make sense. But now the
> > question is what should be done? This feels a bit
> unintuitive.
> >
> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
> > mailto:yuva...@gmail.com>> wrote:
> >
> > Hi,
> > Flink 1.12.1, Blink Planner, Scala 2.12
> >
> > I have the following logical plan:
> >
> >
>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
> baz, hello_world, a, b])
> > +- LogicalProject(value=[$2],
> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
> > 

Re: pyflink 如何使用session window对相同pv数据聚合

2021-03-08 Thread Xingbo Huang
Hi,
1.12 还不支持session window的udaf,在1.13上将提供这部分的支持,具体可以关注JIRA[1]。
然后,1.12是支持ProcessFunction和KeyedProcessFunction的,具体可以参考代码[2]

[1] https://issues.apache.org/jira/browse/FLINK-21630
[2]
https://github.com/apache/flink/blob/release-1.12/flink-python/pyflink/datastream/functions.py

Best,
Xingbo

Hongyuan Ma  于2021年3月8日周一 下午7:10写道:

> 我也想知道,我看文档,目前pyflink似乎还不支持processfunction
>
>
> 在2021年03月08日 19:03,kk 写道:
> hi,all:
>
> 一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
> window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
> 希望知道的大佬能给点建议。感谢!!!
>
> session_window = Session.with_gap("60.second").on("pv_time").alias("w")
> t_env.from_path('source') \
>.window(session_window) \
>.group_by("w,pv_id") \
>.select("pv_id,get_act(act)").insert_into("sink")
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png
> ;
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Stop vs Cancel with savepoint

2021-03-08 Thread Thomas Eckestad
OK, thank you for validating my thoughts =) I created 
https://issues.apache.org/jira/browse/FLINK-21666#

Thanks,
Thomas

On 3 Mar 2021, at 22:02, Chesnay Schepler 
mailto:ches...@apache.org>> wrote:

Your understanding of cancel vs stop(-with-savepoint) is correct.

I agree that we should update the REST API documentation and have a section 
outlining the problems with cancel-with-savepoint.
Would you like to open a ticket yourself?

On 3/3/2021 11:16 AM, Thomas Eckestad wrote:
Hi!

Cancel with savepoint is marked as deprecated in the cli-documentation. It is 
not marked as deprecated in the REST-API documentation though? Is that a 
mistake? At least some recommendation regarding stop vs cancel would be 
appropriate to include in the API doc, or?

As I understand, stop will cancel each operator in the job-DAG bottom-up in a 
gracefull manner. Conceptually meaning, first cancel the sources, then, when 
the operators directly downstream to the sources have drained all pending 
input, those will be canceled as well. This continues until the sinks are done 
as well. Or, maybe more to the point, the checkpoint barrier triggered for the 
savepoint will not be followed by any more input data, the sources will stop 
consuming new data untill the savepoint is complete and the job exits.

Is the above understanding correct? In that case, for some streaming jobs 
without exactly-once sinks, cancel with savepoint might trigger duplication. 
Which should be OK of course since the job needs to handle a restart anyway, 
but it might be beneficial to not generate duplicated output for this specific 
use case if there is a choice where the alternatives have the same cost 
implementation wise...

Is my understanding of cancel vs stop correct? If not what is the real 
practical difference between stop and cancel with savepoint?

To me it feels like cancel with save point should be deprecated in both the 
rest API and the cli and also there should be a text that explains why it is 
deprecated and why usage of it is discouraged, or?

Thanks,
Thomas
Thomas Eckestad
Systems Engineer
Road Perception

NIRA Dynamics AB
Wallenbergs gata 4
58330 Link?ping, Sweden
Mobile: +46  738 453 937
thomas.eckes...@niradynamics.se
www.niradynamics.se





Re: java options to generate heap dump in EMR not working

2021-03-08 Thread bat man
Issue was with double quotes around the Java options. This worked -

env.java.opts: -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof

On Mon, 8 Mar 2021 at 12:02 PM, Yun Gao  wrote:

> Hi,
>
> I tried with the standalone session (sorry I do not have a yarn cluster in
> hand) and it seems that
> the flink cluster could startup normally. Could you check the log of
> NodeManager to see the detail
> reason that the container does not get launched? Also have you check if
> there are some spell error
> or some unexpected special white space character for the configuration ?
>
> For the case of configuring `env.java.opts`, it seems the JobManager also
> could not be launched with
> this configuration.
>
> Best,
> Yun
>
> --Original Mail --
> *Sender:*bat man 
> *Send Date:*Sat Mar 6 16:03:06 2021
> *Recipients:*user 
> *Subject:*java options to generate heap dump in EMR not working
>
> Hi,
>>
>> I am trying to generate a heap dump to debug a GC overhead OOM. For that
>> I added the below java options in flink-conf.yaml, however after adding
>> this the yarn is not able to launch the containers. The job logs show it
>> goes on requesting for containers from yarn and it gets them, again
>> releases it. then again the same cycle continues. If I remove the option
>> from flink-conf.yaml then the containers are launched and the job starts
>> processing.
>>
>>
>> *env.java.opts.taskmanager: "-XX:+HeapDumpOnOutOfMemoryError
>> -XX:HeapDumpPath=/tmp/dump.hprof"*
>>
>> If I try this then yarn client does not comes up -
>>
>>
>> *env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
>> -XX:HeapDumpPath=/tmp/dump.hprof"*
>>
>> Am I doing anything wrong here?
>>
>> PS: I am using EMR.
>>
>> Thanks,
>> Hemant
>>
>


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-08 Thread bat man
The Java options should not have the double quotes. That was the issue. I
was able to generate the heap dump. based on the dump have made some
changes in the code to fix this issue.

This worked -

env.java.opts: -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof

Thanks.

On Mon, 8 Mar 2021 at 7:48 AM, Xintong Song  wrote:

> Hi Hemant,
> I don't see any problem in your settings. Any exceptions suggesting why TM
> containers are not coming up?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Mar 6, 2021 at 3:53 PM bat man  wrote:
>
>> Hi Xintong Song,
>> I tried using the java options to generate heap dump referring to docs[1]
>> in flink-conf.yaml, however after adding this the task manager containers
>> are not coming up. Note that I am using EMR. Am i doing anything wrong here?
>>
>> env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
>> -XX:HeapDumpPath=/tmp/dump.hprof"
>>
>> Thanks,
>> Hemant
>>
>>
>>
>>
>>
>> On Fri, Mar 5, 2021 at 3:05 PM Xintong Song 
>> wrote:
>>
>>> Hi Hemant,
>>>
>>> This exception generally suggests that JVM is running out of heap
>>> memory. Per the official documentation [1], the amount of live data barely
>>> fits into the Java heap having little free space for new allocations.
>>>
>>> You can try to increase the heap size following these guides [2].
>>>
>>> If a memory leak is suspected, to further understand where the memory is
>>> consumed, you may need to dump the heap on OOMs and looking for unexpected
>>> memory usages leveraging profiling tools.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>>>
>>>
>>>
>>> On Fri, Mar 5, 2021 at 4:24 PM bat man  wrote:
>>>
 Hi,

 Getting the below OOM but the job failed 4-5 times and recovered from
 there.

 j







 *ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
 exceededat
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
   at
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
   at
 org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
   at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)Caused by:
 java.lang.OutOfMemoryError: GC overhead limit exceeded*

 Is there any way I can debug this. since the job after a few re-starts
 started running fine. what could be the reason behind this.

 Thanks,
 Hemant

>>>


回复:pyflink 如何使用session window对相同pv数据聚合

2021-03-08 Thread Hongyuan Ma
我也想知道,我看文档,目前pyflink似乎还不支持processfunction


在2021年03月08日 19:03,kk 写道:
hi,all:
一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
希望知道的大佬能给点建议。感谢!!!

session_window = Session.with_gap("60.second").on("pv_time").alias("w")
t_env.from_path('source') \
   .window(session_window) \
   .group_by("w,pv_id") \
   .select("pv_id,get_act(act)").insert_into("sink")



pyflink 如何使用session window对相同pv数据聚合

2021-03-08 Thread kk
hi,all:
一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
希望知道的大佬能给点建议。感谢!!!

session_window = Session.with_gap("60.second").on("pv_time").alias("w")
t_env.from_path('source') \
.window(session_window) \
.group_by("w,pv_id") \
.select("pv_id,get_act(act)").insert_into("sink")


 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: How do I call an algorithm written in C++ in Flink?

2021-03-08 Thread Yun Gao
Hi Suxi,

Do you mean you want to call the algorithm in C++ ? If so, I think you could
do it the same with as you wrap it in SpringBoot project via JNI. I think you 
do not need to add a new operator, and you could use existing Flink API, and
you could load you library in open() and call the algorithm in the following 
processing method.

Best,
Yun



 --Original Mail --
Sender:苏喜 张 <15138217...@163.com>
Send Date:Mon Mar 8 14:12:02 2021
Recipients:user@flink.apache.org 
Subject:How do I call an algorithm written in C++ in Flink?

The company has provided an algorithm written in C++, which has been packaged 
into a.so file. I have built a SpringBoot project, which uses JNI to operate 
the algorithm written in C++. Could you please tell me how to call it in Flink? 
Do i need to define operators, chains of operators?
 

Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Rainie Li
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list.
Will *rebalance()
*cause it?

Thanks again.
Best regards
Rainie

SingleOutputStreamOperator> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction>() {
  public void processElement(
  T element,
  ProcessFunction>.Context context,
  Collector> collector) {
for (StreamFilter filter : filters) {
  if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord result = new
SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
  }
}
  }
})
.name("Process-" + eventClass.getSimpleName());


On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:

> Hi Rainie,
>
> Could you please provide more information about your processing logic?
> Do you use window operators?
> If there's no time-based operator in your logic, late arrival data won't be
> dropped by default and there might be something wrong with your flat map or
> filter operator. Otherwise, you can use sideOutputLateData() to get the
> late
> data of the window and have a look at them. See [1] for more information
> about sideOutputLateData().
>
> [1].
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> Regards,
> Smile
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: 【flink sql-client 读写 Kerberos认证的hive】

2021-03-08 Thread Rui Li
那应该就是跟https://issues.apache.org/jira/browse/FLINK-20913
 有关了,这个issue是1.12.2修复的,可以升级一下试试。

On Mon, Mar 8, 2021 at 2:15 PM guoyb <861277...@qq.com> wrote:

> 您好!
> hive.metastore.sasl.enabled 是true
>
>
> 启动sql client的时候,可以正常读取到认证信息,并读取metastore的表名。
>
>
> 读和写,认证就失败了。
>
>
>
> ---原始邮件---
> 发件人: "Rui Li" 发送时间: 2021年3月8日(周一) 中午12:12
> 收件人: "user-zh" 主题: Re: 【flink sql-client 读写 Kerberos认证的hive】
>
>
> Hi,
>
>
> 从你发的stacktrace来看,走到了set_ugi方法说明client认为server没有开启kerberos。确认一下你HiveCatalog这边指定的hive-site.xml是否配置正确呢,像hive.metastore.sasl.enabled是不是设置成true了?
>
> On Sun, Mar 7, 2021 at 5:49 PM 861277...@qq.com <861277...@qq.com
> wrote:
>
>  环境:
>  flink1.12.1nbsp;
>  hive2.1.0
>  CDH6.2.0
> 
> 
>  【问题描述】
>  nbsp;在没开启Kerberos认证时,可以正常读写hive表
>  nbsp;
>  nbsp;开启Kerberos认证后,
>  nbsp;启动时可以正常读取到hive metastore的元数据信息,读写不了表。
> 
> 
>  【sql-client.sh embedded】
>  Flink SQLgt; show tables;
>  dimension_table
>  dimension_table1
>  test
> 
> 
>  Flink SQLgt; select * from test;
>  [ERROR] Could not execute SQL statement. Reason:
>  org.apache.flink.connectors.hive.FlinkHiveException: Failed to
> collect all
>  partitions from hive metaStore
> 
> 
>  【完整日志
> 
> /opt/cloudera/parcels/FLINK-1.12.1-BIN-SCALA_2.11/lib/flink/log/flink-root-sql-client-cdh6.com.log】
> 
>  2021-03-07 10:29:18.776 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Trying to connect to localhost/127.0.0.1:6123
>  2021-03-07 10:29:18.777 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address 'cdh6.com/192.168.31.10': Connection
>  refused (Connection refused)
>  2021-03-07 10:29:18.778 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/127.0.0.1': Connection refused
>  (Connection refused)
>  2021-03-07 10:29:18.778 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address
> '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33':
>  Network is unreachable (connect failed)
>  2021-03-07 10:29:18.778 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/192.168.31.10': Connection refused
>  (Connection refused)
>  2021-03-07 10:29:18.779 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/0:0:0:0:0:0:0:1%lo': Network is
>  unreachable (connect failed)
>  2021-03-07 10:29:18.779 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/127.0.0.1': Connection refused
>  (Connection refused)
>  2021-03-07 10:29:18.779 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address
> '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33':
>  Network is unreachable (connect failed)
>  2021-03-07 10:29:18.779 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/192.168.31.10': Connection refused
>  (Connection refused)
>  2021-03-07 10:29:18.780 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/0:0:0:0:0:0:0:1%lo': Network is
>  unreachable (connect failed)
>  2021-03-07 10:29:18.780 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/127.0.0.1': Connection refused
>  (Connection refused)
>  2021-03-07 10:29:18.780 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Could not connect. Waiting for 1600 msecs before next attempt
>  2021-03-07 10:29:20.381 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Trying to connect to localhost/127.0.0.1:6123
>  2021-03-07 10:29:20.381 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address 'cdh6.com/192.168.31.10': Connection
>  refused (Connection refused)
>  2021-03-07 10:29:20.382 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/127.0.0.1': Connection refused
>  (Connection refused)
>  2021-03-07 10:29:20.383 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address
> '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33':
>  Network is unreachable (connect failed)
>  2021-03-07 10:29:20.383 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/192.168.31.10': Connection refused
>  (Connection refused)
>  2021-03-07 10:29:20.383 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/0:0:0:0:0:0:0:1%lo': Network is
>  unreachable (connect failed)
>  2021-03-07 10:29:20.383 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect from address '/127.0.0.1': Connection refused
>  (Connection refused)
>  2021-03-07 10:29:20.384 [main] INFOnbsp;
> org.apache.flink.runtime.net.ConnectionUtilsnbsp;
>  - Failed to connect 

Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Smile
Hi Rainie, 

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
 

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-08 Thread jindy_liu
恩,这里有个问题就是,假设我们以离线结果为基准去对比,但离线结果一般天级或小时级,但实时部分可能是秒级的,两个结果在连线环境做比较,也不好去看这个结果有差异的时候,到底实时计算部分有没有问题!

有很多种原因可能会导致这个结果不准确。。。比如flink sql的bug或都流式消息丢失了等等!




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink application has slightly data loss using Processing Time

2021-03-08 Thread Rainie Li
Hello Flink Community,

Our flink application in v1.9, the basic logic of this application is
consuming one large kafka topic and filter some fields, then produce data
to a new kafka topic.
After comparing the original kafka topic count with the generated kafka
topic based on the same field by using presto query, it had slightly data
loss (around 1.37220156e-7 per hour).
The Original kafka topic is collecting data from mobile devices, it could
have late arrival events. That's why we use processing time since order
does not matter.

This job is using Processing time, any idea what could potentially cause
this data loss?
Also if flink is using processing time, what is the default time window?
Will the default time window cause it?

Appreciated for any suggestions.
Thanks
Best regards
Rainie


Re: flink作业报 task manager连接错误

2021-03-08 Thread Smile
你好,
可以看下具体那个 TaskManager 的日志,我之前遇到的这种情况一般都是内存用超被容器(比如Yarn)Kill 掉或者是 TaskManager
里面抛异常了。如果是 received signal 15 一般就是被容器 kill 掉了,可以看下容器的日志,其他情况可以看下具体的异常。

Smile



--
Sent from: http://apache-flink.147419.n8.nabble.com/

BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-08 Thread HunterXHunter
1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口

例如:
 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test group by msg,rowtime"));

// 获得 DataStream,并定义wtm生成
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
// map 
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
);


参考 官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html


// stream - 转 Table,指定Rowtime
tableEnv.createTemporaryView("test5",
r,
$("msg"),
$("rowtime").rowtime());

String sql5 = "select " +
"msg," +
"count(1) cnt" +
" from test5 " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
"";
tableEnv.executeSql("insert into printlnRetractSink " + sql5);


结果: 无法触发窗口操作。
查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator
// 返回的wtm永远都是 -9223372036854775808
public long getCurrentWatermark() {
return internalTimerService.currentWatermark();
}

//
查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是
internalTimerService.currentWatermark() 却拿的是-9223372036854775808

// 当  tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select
msg,rowtime from test group by msg,rowtime"));
语句改为
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test"));

结果就是正确的。
所以这是一个bug吗??








--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-08 Thread Smile
你好,
实时和离线对数的问题确实也比较难,没有很完美的解决方案。
一般可以考虑把实时产出结果也落离线表,然后对两张离线表做对比,离线 Join 上然后跑具体对比逻辑即可。

Smile


jindy_liu wrote
> 有没有大佬有思路可以参考下?
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/