Re:Re: 关于异步 AsyncTableFunction CompletableFuture 的疑问?

2021-12-06 Thread Michael Ran
好的,谢谢,我这边尝试下异步保证顺序,我们这边有些场景
在 2021-12-07 14:17:51,"Caizhi Weng"  写道:
>Hi!
>
>1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?
>
>
>正确。例如 HBaseRowDataAsyncLookupFunction 里就调用了 hbase 提供的 table#get 异步方法实现异步查询。
>
>2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?
>
>
>Async operator 就是为了提高无需保序的操作(例如很多 etl 查维表就不关心顺序)的效率才引入的,如果对顺序有强需求就不能用 async
>operator。
>
>Michael Ran  于2021年12月7日周二 10:33写道:
>
>> deal all:
>> 目前在看table api 中,自定义的异步 join 方法 AsyncTableFunction#eval
>> 方法时,发现接口提供的是:
>> public void eval(CompletableFuture>
>> future,Object... keys) {...}
>>
>>
>> 目前遇到两个问题:
>>
>>
>> 1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?
>> 2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?
>>
>>
>> 有各位大佬方便介绍一下吗?或者更详细的文档说明之类的?
>> 非常感谢。
>>


Re: 关于异步 AsyncTableFunction CompletableFuture 的疑问

2021-12-06 Thread Caizhi Weng
Hi!

1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?


正确。例如 HBaseRowDataAsyncLookupFunction 里就调用了 hbase 提供的 table#get 异步方法实现异步查询。

2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?


Async operator 就是为了提高无需保序的操作(例如很多 etl 查维表就不关心顺序)的效率才引入的,如果对顺序有强需求就不能用 async
operator。


陌↓莣言  于2021年12月7日周二 13:06写道:

> deal all: 目前在看table api 中,自定义的异步 join 方法
> AsyncTableFunction#eval 方法时,发现接口提供的是: public void
> eval(CompletableFuture {...} 目前遇到两个问题: 1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join
> 得自己实现,这个理解对吗? 2. 像join hbase
> 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?
> 有各位大佬方便介绍一下吗?或者更详细的文档说明之类的? 非常感谢。


Re: 关于异步 AsyncTableFunction CompletableFuture 的疑问?

2021-12-06 Thread Caizhi Weng
Hi!

1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?


正确。例如 HBaseRowDataAsyncLookupFunction 里就调用了 hbase 提供的 table#get 异步方法实现异步查询。

2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?


Async operator 就是为了提高无需保序的操作(例如很多 etl 查维表就不关心顺序)的效率才引入的,如果对顺序有强需求就不能用 async
operator。

Michael Ran  于2021年12月7日周二 10:33写道:

> deal all:
> 目前在看table api 中,自定义的异步 join 方法 AsyncTableFunction#eval
> 方法时,发现接口提供的是:
> public void eval(CompletableFuture>
> future,Object... keys) {...}
>
>
> 目前遇到两个问题:
>
>
> 1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?
> 2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?
>
>
> 有各位大佬方便介绍一下吗?或者更详细的文档说明之类的?
> 非常感谢。
>


Re: Order of events in Broadcast State

2021-12-06 Thread Alexey Trenikhun
Thank you David

From: David Anderson 
Sent: Monday, December 6, 2021 1:36:20 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Order of events in Broadcast State

Event ordering in Flink is only maintained between pairs of events that take 
exactly the same path through the execution graph. So if you have multiple 
instances of A (let's call them A1 and A2), each broadcasting a partition of 
the total rule space, then one instance of B (B1) might receive rule1 from A1 
before rule2 from A2, while B2 might receive rule2 before rule1.

If it fits your needs, one simple way to avoid having problems with this is to 
broadcast from a task with a parallelism of 1. Then every downstream instance 
will receive the broadcast stream in the same order.

David

On Sat, Dec 4, 2021 at 2:45 AM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
[1] - 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/
The Broadcast State Pattern | Apache 
Flink
The Broadcast State Pattern # In this section you will learn about how to use 
broadcast state in practise. Please refer to Stateful Stream Processing to 
learn about the concepts behind stateful stream processing. Provided APIs # To 
show the provided APIs, we will start with an example before presenting their 
full functionality. As our running example, we will use the case where we have 
a ...
nightlies.apache.org



From: Alexey Trenikhun mailto:yen...@msn.com>>
Sent: Friday, December 3, 2021 4:33 PM
To: Flink User Mail List mailto:user@flink.apache.org>>
Subject: Order of events in Broadcast State

Hello,
Trying to understand what statement "Order of events in Broadcast State may 
differ across tasks" in [1] means.
Let's say I have keyed function "A" which broadcasting stream of rules, 
KeyedBroadcastProcessFunction  "B" receives rules and updates broadcast state, 
like example in [1]. Let's say "A" broadcasts "rule 1" with name X, then "A" 
(same key) broadcasts "rule 2" with same name X, is there guarantee that 
eventually broadcast state will contain "rule 2" or since there is no ordering, 
B could receive "rule 2", then "rule 1" and broadcast state will end up with 
{X="rule 1"} forever ?

Thanks,
Alexey


Re: Table DataStream Conversion Lost Watermark

2021-12-06 Thread Yunfeng Zhou
Hi Timo,

Thanks for this information. Since it is confirmed that toDataStream is
functioning correctly and that I can avoid this problem by not using
fromValues in my implementation, I think I have got enough information for
my current work and don't need to rediscuss fromDatastream's behavior.

Best regards,
Yunfeng

On Tue, Dec 7, 2021 at 12:42 AM Timo Walther  wrote:

> Hi Yunfeng,
>
> it seems this is a deeper issue with the fromValues implementation.
> Under the hood, it still uses the deprecated InputFormat stack. And as
> far as I can see, there we don't emit a final MAX_WATERMARK. I will
> definitely forward this.
>
> But toDataStream forwards watermarks correctly.
>
> I hope this helps. Or do you think we should also rediscuss the
> fromDataStream watermark behavior?
>
> Regards,
> Timo
>
>
> On 06.12.21 10:26, Yunfeng Zhou wrote:
> > Hi Timo,
> >
> > Thanks for your response. I encountered another problem that might be
> > relevant to the watermark as we discussed above.
> >
> > In the test cases shown below. I would create a table from some data,
> > convert it to datastream and do windowAll().reduce() on it. If we need
> > to explicitly specify a `rowtime` metadata column in order to make the
> > table pass timestamps to the converted datastream, then both the test
> > cases should print out empty lists. In fact, one of them could print out
> > a list with some data. The only difference between them is that I
> > changed the value of some input data. This behavior can be reproduced
> > under Flink ML's latest java environment and configurations.
> >
> > Is this the expected behavior of `toDataStream`, or I have accidentally
> > encountered a bug?
> >
> > Best regards,
> > Yunfeng
> >
> > ```java
> >
> > public class SimpleTest {
> >  @Test
> > public void testSimple1()throws Exception {
> >  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> >
> > Table inputTable = tEnv.fromValues(
> >  DataTypes.ROW(
> >  DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f0", DataTypes.STRING()),
> > DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f4", DataTypes.INT()),
> > DataTypes.FIELD("label", DataTypes.STRING())
> >  ),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "b", 0., 1., 1., 3, "l1"),
> > Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> > Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> > Row.of(1., "a", 1., 1., 0., 1, "l0"),
> > Row.of(2., "d", 1., 1., 0., 1, "l0")
> >  );
> >
> > DataStream input = tEnv.toDataStream(inputTable);
> >
> > System.out.println(IteratorUtils.toList(input
> >  .windowAll(EndOfStreamWindows.get())
> >  .reduce((ReduceFunction) (row, t1) -> row)
> >  .executeAndCollect()
> >  ));
> > }
> >
> >
> >  @Test
> > public void testSimple2()throws Exception {
> >  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> >
> > Table inputTable = tEnv.fromValues(
> >  DataTypes.ROW(
> >  DataTypes.FIELD("weight", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f0", DataTypes.STRING()),
> > DataTypes.FIELD("f1", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f2", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f3", DataTypes.DOUBLE()),
> > DataTypes.FIELD("f4", DataTypes.INT()),
> > DataTypes.FIELD("label", DataTypes.STRING())
> >  ),
> > Row.of(1., "a", 1., 1., 1., 2, "l1"),
> > Row.of(1., "a", 1., 0., 1., 2, "l1"),
> > Row.of(1., "b", 0., 1., 1., 3, "l1"),
> > Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
> > Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
> > Row.of(1., "a", 1., 1.5, 0., 1, "l0"),
> > Row.of(2., "d", 1., 1., 0., 1, "l0")
> >  );
> >
> > DataStream input = tEnv.toDataStream(inputTable);
> >
> > System.out.println(IteratorUtils.toList(input
> >  .windowAll(EndOfStreamWindows.get())
> >  .reduce((ReduceFunction) (row, t1) -> row)
> >  .executeAndCollect()
> >  ));
> > }
> > }
> >
> > ```
> >
> > ```java
> >
> > /**
> > * A {@link WindowAssigner} that assigns all elements of a bounded input
> > stream into one window
> > * pane. The results are emitted once the input stream has ended.
> > */
> > public class EndOfStreamWindowsextends WindowAssigner TimeWindow> {
> >
> >  private static final EndOfStreamWindowsINSTANCE =new
> EndOfStreamWindows();
> >
> > private EndOfStreamWindows() {}
> >
> >  public static EndOfStreamWindowsget() {
> >  return INSTANCE;
> > }
> >
> >  @Override
> > public CollectionassignWindows(
> >  Object 

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-12-06 Thread Natu Lauchande
Hey Timo and Flink community,

I wonder if there is a fix for this issue. The last time I rollbacked to
version 12 of Flink and downgraded Ververica.

I am really keen to leverage the new features on the latest versions of
Ververica 2.5+ , i have tried a myriad of tricks suggested ( example :
building the image with hadoop-client libraries) :

java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getDeclaredConstructors(Class.java:2020)
at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:
1961)
at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:
272)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
2003)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:615)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:600)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:587)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
InstantiationUtil.java:541)
at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:322)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
OperatorChain.java:159)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(
StreamTask.java:551)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.
Configuration
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader
.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader
.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(
FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 48 more

This error occurs when writing to StreamFileWriting on S3 in Parquet
format.


Thanks,
Natu

On Thu, Jul 22, 2021 at 3:53 PM Timo Walther  wrote:

> Thanks, this should definitely 

???????? AsyncTableFunction CompletableFuture ??????

2021-12-06 Thread ?????w??
deal all?? table api  join  
AsyncTableFunction#eval ?? public void 
eval(CompletableFuture

Re: [DISCUSS] Drop Zookeeper 3.4

2021-12-06 Thread Dongwon Kim
When should I prepare for upgrading ZK to 3.5 or newer?
We're operating a Hadoop cluster w/ ZK 3.4.6 for running only Flink jobs.
Just hope that the rolling update is not that painful - any advice on this?

Best,

Dongwon

On Tue, Dec 7, 2021 at 3:22 AM Chesnay Schepler  wrote:

> Current users of ZK 3.4 and below would need to upgrade their Zookeeper
> installation that is used by Flink to 3.5+.
>
> Whether K8s users are affected depends on whether they use ZK or not. If
> they do, see above, otherwise they are not affected at all.
>
> On 06/12/2021 18:49, Arvid Heise wrote:
>
> Could someone please help me understand the implications of the upgrade?
>
> As far as I understood this upgrade would only affect users that have a
> zookeeper shared across multiple services, some of which require ZK 3.4-? A
> workaround for those users would be to run two ZKs with different versions,
> eventually deprecating old ZK, correct?
>
> If that is the only limitation, I'm +1 for the proposal since ZK 3.4 is
> already EOL.
>
> How are K8s users affected?
>
> Best,
>
> Arvid
>
> On Mon, Dec 6, 2021 at 2:00 PM Chesnay Schepler 
> wrote:
>
>> ping @users; any input on how this would affect you is highly appreciated.
>>
>> On 25/11/2021 22:39, Chesnay Schepler wrote:
>> > I included the user ML in the thread.
>> >
>> > @users Are you still using Zookeeper 3.4? If so, were you planning to
>> > upgrade Zookeeper in the near future?
>> >
>> > I'm not sure about ZK compatibility, but we'd also upgrade Curator to
>> > 5.x, which doesn't support ookeeperK 3.4 anymore.
>> >
>> > On 25/11/2021 21:56, Till Rohrmann wrote:
>> >> Should we ask on the user mailing list whether anybody is still using
>> >> ZooKeeper 3.4 and thus needs support for this version or can a
>> ZooKeeper
>> >> 3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would expect that
>> >> not a
>> >> lot of users depend on it but just to make sure that we aren't
>> >> annoying a
>> >> lot of our users with this change. Apart from that +1 for removing it
>> if
>> >> not a lot of user depend on it.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl > >
>> >> wrote:
>> >>
>> >>> Thanks for starting this discussion, Chesnay. +1 from my side. It's
>> >>> time to
>> >>> move forward with the ZK support considering the EOL of 3.4 you
>> already
>> >>> mentioned. The benefits we gain from upgrading Curator to 5.x as a
>> >>> consequence is another plus point. Just for reference on the
>> >>> inconsistent
>> >>> state issue you mentioned: FLINK-24543 [1].
>> >>>
>> >>> Matthias
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-24543
>> >>>
>> >>> On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler > >
>> >>> wrote:
>> >>>
>>  Hello,
>> 
>>  I'd like to drop support for Zookeeper 3.4 in 1.15, upgrading the
>>  default to 3.5 with an opt-in for 3.6.
>> 
>>  Supporting Zookeeper 3.4 (which is already EOL) prevents us from
>>  upgrading Curator to 5.x, which would allow us to properly fix an
>>  issue
>>  with inconsistent state. It is also required to eventually support ZK
>> >>> 3.6.
>> >
>> >
>>
>>
>


Re: [DISCUSS] Drop Zookeeper 3.4

2021-12-06 Thread Yang Wang
FYI:

We(Alibaba) are widely using ZooKeeper 3.5.5 for all the YARN and some K8s
Flink high-available applications.


Best,
Yang

Chesnay Schepler  于2021年12月7日周二 上午2:22写道:

> Current users of ZK 3.4 and below would need to upgrade their Zookeeper
> installation that is used by Flink to 3.5+.
>
> Whether K8s users are affected depends on whether they use ZK or not. If
> they do, see above, otherwise they are not affected at all.
>
> On 06/12/2021 18:49, Arvid Heise wrote:
> > Could someone please help me understand the implications of the upgrade?
> >
> > As far as I understood this upgrade would only affect users that have
> > a zookeeper shared across multiple services, some of which require ZK
> > 3.4-? A workaround for those users would be to run two ZKs with
> > different versions, eventually deprecating old ZK, correct?
> >
> > If that is the only limitation, I'm +1 for the proposal since ZK 3.4
> > is already EOL.
> >
> > How are K8s users affected?
> >
> > Best,
> >
> > Arvid
> >
> > On Mon, Dec 6, 2021 at 2:00 PM Chesnay Schepler 
> > wrote:
> >
> > ping @users; any input on how this would affect you is highly
> > appreciated.
> >
> > On 25/11/2021 22:39, Chesnay Schepler wrote:
> > > I included the user ML in the thread.
> > >
> > > @users Are you still using Zookeeper 3.4? If so, were you
> > planning to
> > > upgrade Zookeeper in the near future?
> > >
> > > I'm not sure about ZK compatibility, but we'd also upgrade
> > Curator to
> > > 5.x, which doesn't support ookeeperK 3.4 anymore.
> > >
> > > On 25/11/2021 21:56, Till Rohrmann wrote:
> > >> Should we ask on the user mailing list whether anybody is still
> > using
> > >> ZooKeeper 3.4 and thus needs support for this version or can a
> > ZooKeeper
> > >> 3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would expect
> > that
> > >> not a
> > >> lot of users depend on it but just to make sure that we aren't
> > >> annoying a
> > >> lot of our users with this change. Apart from that +1 for
> > removing it if
> > >> not a lot of user depend on it.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl
> > 
> > >> wrote:
> > >>
> > >>> Thanks for starting this discussion, Chesnay. +1 from my side.
> > It's
> > >>> time to
> > >>> move forward with the ZK support considering the EOL of 3.4
> > you already
> > >>> mentioned. The benefits we gain from upgrading Curator to 5.x as
> a
> > >>> consequence is another plus point. Just for reference on the
> > >>> inconsistent
> > >>> state issue you mentioned: FLINK-24543 [1].
> > >>>
> > >>> Matthias
> > >>>
> > >>> [1] https://issues.apache.org/jira/browse/FLINK-24543
> > >>>
> > >>> On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler
> > 
> > >>> wrote:
> > >>>
> >  Hello,
> > 
> >  I'd like to drop support for Zookeeper 3.4 in 1.15, upgrading
> the
> >  default to 3.5 with an opt-in for 3.6.
> > 
> >  Supporting Zookeeper 3.4 (which is already EOL) prevents us from
> >  upgrading Curator to 5.x, which would allow us to properly
> > fix an
> >  issue
> >  with inconsistent state. It is also required to eventually
> > support ZK
> > >>> 3.6.
> > >
> > >
> >
>


关于异步 AsyncTableFunction CompletableFuture 的疑问?

2021-12-06 Thread Michael Ran
deal all:
目前在看table api 中,自定义的异步 join 方法 AsyncTableFunction#eval 方法时,发现接口提供的是:
public void eval(CompletableFuture> 
future,Object... keys) {...}


目前遇到两个问题:


1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?
2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?


有各位大佬方便介绍一下吗?或者更详细的文档说明之类的?
非常感谢。
  

回复: flink cdc支持mysql整库同步进hudi湖吗?

2021-12-06 Thread chengyanan1...@foxmail.com
支持,例子参考hudi官网



chengyanan1...@foxmail.com
 
发件人: casel.chen
发送时间: 2021-12-06 23:55
收件人: user-zh@flink.apache.org
主题: flink cdc支持mysql整库同步进hudi湖吗?
flink cdc支持mysql整库同步进hudi湖吗?如果支持的话,希望能给一个例子,还要求能够支持schema变更。谢谢! 


Re: Issue with incremental checkpointing size

2021-12-06 Thread Caizhi Weng
Hi!

the checkpointing size is not going beyond 300 MB


Is 300MB the total size of checkpoint or the incremental size of
checkpoint? If it is the latter one, Flink will only store necessary
information (for example the keys and the fields that are selected) in
checkpoint and it is compressed, so for 3~5GB input records it is
reasonable for the incremental checkpoint size to shrink to ~300MB. Of
course this depends on the detailed workflow.

there must be something bottleneck with Flink RocksDB configurations.
>

By "bottleneck" do you mean the checkpoint speed is too slow? If yes you
can try to increase the parallelism of the job so that there will be less
burden on each parallelism when making checkpoints.

Vidya Sagar Mula  于2021年12月7日周二 08:04写道:

> Hi,
>
> In my project, we are trying to configure the "Incremental checkpointing"
> with RocksDB in the backend.
>
> We are using Flink 1.11 version and RockDB with AWS : S3 backend
>
> Issue:
> --
> In my pipeline, my window size is 5 mins and the incremental checkpointing
> is happening for every 2 mins.
> I am pumping the data in such a way that the keys are not the same for
> each record. That means, the incremental checkpointing size should keep
> increasing for each checkpoint.
>
> So, the expectation here is that the size of the checkpointing should
> reach atleast 3-5 GB with the amount of the data pumped in.
>
> However, the checkpointing size is not going beyond 300 MB and that too it
> is taking around 2 mins duration for taking this 300 MB checkpoint.
>
> In my set up, I am using
>
> Cluster: Cloud cluster with instance storage.
> Memory : 20 GB,
> Heap : 10 GB
> Flink Memory: 4.5 GB
> Flink Version : 1.11
> Back end: RocksDB with AWS S3 backend
>
>
> I would feel that, there must be something bottleneck with Flink RocksDB
> configurations.
> Can you please advise me?
>
> Thanks,
>
>
>
>
>


Issue with incremental checkpointing size

2021-12-06 Thread Vidya Sagar Mula
Hi,

In my project, we are trying to configure the "Incremental checkpointing"
with RocksDB in the backend.

We are using Flink 1.11 version and RockDB with AWS : S3 backend

Issue:
--
In my pipeline, my window size is 5 mins and the incremental checkpointing
is happening for every 2 mins.
I am pumping the data in such a way that the keys are not the same for each
record. That means, the incremental checkpointing size should keep
increasing for each checkpoint.

So, the expectation here is that the size of the checkpointing should reach
atleast 3-5 GB with the amount of the data pumped in.

However, the checkpointing size is not going beyond 300 MB and that too it
is taking around 2 mins duration for taking this 300 MB checkpoint.

In my set up, I am using

Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Memory: 4.5 GB
Flink Version : 1.11
Back end: RocksDB with AWS S3 backend


I would feel that, there must be something bottleneck with Flink RocksDB
configurations.
Can you please advise me?

Thanks,


Re: [DISCUSS] Deprecate Java 8 support

2021-12-06 Thread Nicolás Ferrario
Oh my bad, it must be Statefun then. I remember I needed to play around
with that for _some_ build.

On Wed, Dec 1, 2021 at 7:48 PM Chesnay Schepler  wrote:

> Flink can be built with Java 11 since 1.10. If I recall correctly we
> solved the tools.jar issue, which Hadoop depends on, by excluding that
> dependency. As far as we could tell it's not actually required.
>
> On 01/12/2021 19:56, Nicolás Ferrario wrote:
>
> Hi all, this would be awesome, I'm so tired of seeing Java 8 everywhere
> (reminds me of Python 2.7).
>
> We're currently building our code against Java 11 because that's the
> latest version of Java available as a Flink Docker image, but it'd be great
> to use newer versions. I think it would also help to clean up dependencies
> and hopefully no longer have incompatibility issues.
> For example, right now it's not possible to build Flink with Java 9+
> because of a Maven dependency. Using JDK 8 or copying "tools.jar" to any
> newer JDK version fixes it (see more:
> https://stackoverflow.com/questions/53707666/how-to-get-tools-jar-for-openjdk-11-on-windows
> ).
>
> Official support for Java 17 would be great.
>
> Greetings!
>
> On Wed, Dec 1, 2021 at 7:51 AM Chesnay Schepler 
> wrote:
>
>> Hello Gavin,
>>
>> If you run into any issues with Java 17, please report them in
>> FLINK-15736 .
>> I recently did some experiments with Java 17 myself; I would think that
>> you will run into some blockers (like ASM requiring an upgrade
>> , or missing
>> --add-opens/--add-exports
>> ).
>>
>> On 01/12/2021 11:12, Gavin Lee wrote:
>>
>> Thanks for sharing this info with us Chesnay.
>> We've been using Flink for 5 years,  and upgraded to 1.13.2 months ago.
>> The java version is still 8.
>> Currently we're testing with java 17 in our staging environment. There
>> are no special concerns.
>> Will update when tests complete.
>>
>>
>> On Tue, Nov 30, 2021 at 1:18 AM Chesnay Schepler 
>> wrote:
>>
>>> Hello,
>>>
>>> we recently had a discussion on the dev mailing list for deprecating
>>> support for Java 8 in 1.15, with a general consensus in favor of it.
>>>
>>> I now wanted to check in with you, our users, to see what you have got
>>> to say about that.
>>>
>>> Why are we interested in deprecating Java 8 support now (and in
>>> eventually removing it)?
>>>
>>> The main reason is that supporting the recently released Java 17 (and
>>> subsequent versions), while maintaining Java 8 support,
>>> will be more complicated than if Java 11 were the oldest release
>>> version. Essentially because Java 11/17 have both crossed the Java 9 chasm.
>>>
>>> We will still have to bite this bullet in any case (because Java 17 is
>>> out *now *but we are *not *dropping Java 8 *now*), but we would still
>>> like to signal that users should upgrade to Java 11 so that we can
>>> *eventually* clean this up.
>>>
>>> Furthermore, it is currently hard to justify investing time into
>>> benchmarks/performance improvements that are specific to Java 11+, because
>>> they provide no benefit to Java 8.
>>> What does the deprecation mean exactly?
>>>
>>> It will primarily mean that a warning will be logged when you run Flink
>>> on Java 8.
>>> We *may* change the default Java version of the Docker images to Java
>>> 11 (the java8 tags will remain),
>>> and we will put a larger emphasis on Flink's performance on Java 11.
>>> Does that mean that Java 8 support will be removed in 1.16/1.17?
>>>
>>> No. We are not putting a hard-date on the removal of Java 8 support at
>>> this time.
>>> Will this mean that at some point we'll surprise you with the removal of
>>> Java 8 support in the next release?
>>>
>>> No. We will announce the removal ahead of time by *at least* half a
>>> year / 2+ releases (probably closer to a full year).
>>> Is the deprecation already decided?
>>>
>>> No. The responses in this thread are integral for deciding whether a
>>> deprecation at this time makes sense.
>>>
>>>
>>> If you are still using Java 8 at the moment, then we would appreciate if
>>> you could tell us whether you already have a time-frame for
>>> when you intend to upgrade to Java 11. We'd also be interested in
>>> anything that blocks your migration to Java 11.
>>>
>>>
>>> Please raise concerns you have, and feel free to ask questions.
>>>
>>
>>
>> --
>> Gavin
>>
>>
>>
>


Re: [DISCUSS] Drop Zookeeper 3.4

2021-12-06 Thread Chesnay Schepler
Current users of ZK 3.4 and below would need to upgrade their Zookeeper 
installation that is used by Flink to 3.5+.


Whether K8s users are affected depends on whether they use ZK or not. If 
they do, see above, otherwise they are not affected at all.


On 06/12/2021 18:49, Arvid Heise wrote:

Could someone please help me understand the implications of the upgrade?

As far as I understood this upgrade would only affect users that have 
a zookeeper shared across multiple services, some of which require ZK 
3.4-? A workaround for those users would be to run two ZKs with 
different versions, eventually deprecating old ZK, correct?


If that is the only limitation, I'm +1 for the proposal since ZK 3.4 
is already EOL.


How are K8s users affected?

Best,

Arvid

On Mon, Dec 6, 2021 at 2:00 PM Chesnay Schepler  
wrote:


ping @users; any input on how this would affect you is highly
appreciated.

On 25/11/2021 22:39, Chesnay Schepler wrote:
> I included the user ML in the thread.
>
> @users Are you still using Zookeeper 3.4? If so, were you
planning to
> upgrade Zookeeper in the near future?
>
> I'm not sure about ZK compatibility, but we'd also upgrade
Curator to
> 5.x, which doesn't support ookeeperK 3.4 anymore.
>
> On 25/11/2021 21:56, Till Rohrmann wrote:
>> Should we ask on the user mailing list whether anybody is still
using
>> ZooKeeper 3.4 and thus needs support for this version or can a
ZooKeeper
>> 3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would expect
that
>> not a
>> lot of users depend on it but just to make sure that we aren't
>> annoying a
>> lot of our users with this change. Apart from that +1 for
removing it if
>> not a lot of user depend on it.
>>
>> Cheers,
>> Till
>>
>> On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl

>> wrote:
>>
>>> Thanks for starting this discussion, Chesnay. +1 from my side.
It's
>>> time to
>>> move forward with the ZK support considering the EOL of 3.4
you already
>>> mentioned. The benefits we gain from upgrading Curator to 5.x as a
>>> consequence is another plus point. Just for reference on the
>>> inconsistent
>>> state issue you mentioned: FLINK-24543 [1].
>>>
>>> Matthias
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-24543
>>>
>>> On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler

>>> wrote:
>>>
 Hello,

 I'd like to drop support for Zookeeper 3.4 in 1.15, upgrading the
 default to 3.5 with an opt-in for 3.6.

 Supporting Zookeeper 3.4 (which is already EOL) prevents us from
 upgrading Curator to 5.x, which would allow us to properly
fix an
 issue
 with inconsistent state. It is also required to eventually
support ZK
>>> 3.6.
>
>



Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-12-06 Thread Dawid Wysakowicz
Hi,

Sorry to hear it's hard to find the option. It is part of the 1.14
release[1]. It is also documented how to enable it[2]. Happy to hear how
we can improve the situation here.


As for the exception. Are you seeing this exception occur repeatedly for
the same task? I can imagine a situation that with frequent checkpoints
a Task might finish while there is an RPC triggering request pending
somewhere on the wire. In that case such a checkpoint could fail, but
the next triggered should not try to trigger the FINISHED task anymore.


Does it cause problems in your pipeline or are you just concerned with
the entry in logs?

Best,

Dawid


[1]
https://github.com/apache/flink/blob/ef0e17ad6319175ce0054fc3c4db14b78e690dd6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java#L236

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta


On 06/12/2021 18:24, James Sandys-Lumsdaine wrote:
> Hello again,
>
> We recently upgraded from Flink 1.12.3 to 1.14.0 and we were hoping it
> would solve our issue with checkpointing with finished data sources.
> We need the checkpointing to work to trigger Flink's
> GenericWriteAheadSink class.
>
> Firstly, the constant mentioned on FLIP-147 that enables the feature
> isn't available as far as we can see
> (ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH). It's not in ConfigConstants
> or CheckpointConfig for example. So instead we enabled with the following:
>
> conf.setBoolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled",
> true);
> StreamExecutionEnvironment env
> = StreamExecutionEnvironment .createLocalEnvironmentWithWebUI(config)
> env.enableCheckpointing(30 * 1000);
> ...
>
> We can see the constant available in 1.15 on Google but not the
> version we were expecting (1.14.0).
>
> Previously we had to have long Thread.sleep(x) in to keep the sources
> alive when checkpoints were taken. When we enable this feature using
> the explicit string and removed these hacks we start seeing these errors:
>
> INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.e.ExecutionGraph
> Source: Order JDBC Source (1/1) (e015c4f0910fb27e15fec063616ab785)
> switched from RUNNING to FINISHED.
>
>  
>
> [some lines removed for brevity]
>
> * *
>
> INFO 
> [flink-akka.actor.default-dispatcher-7] *o.a.f.r.c.CheckpointCoordinator
> Triggering Checkpoint 5 for job 53d42ae669fad6cc8df2fe8f5706b48d
> failed due to {}*
>
> org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager
> received a checkpoint request for unknown
> task e015c4f0910fb27e15fec063616ab785. Failure reason: Task local
> checkpoint failure.
>
>  at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:966)
> ~[flink-runtime-1.14.0.jar:1.14.0]
>
>  at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
> ~[na:na]
>
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[na:1.8.0_91]
>
>  at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
> ~[na:na]
>
>  at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> ~[na:na]
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
> ~[na:na]
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> ~[na:na]
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> ~[na:na]
>
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> ~[na:na]
>
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> ~[na:na]
>
>  at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[scala-library-2.11.12.jar:na]
>
>  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[scala-library-2.11.12.jar:na]
>
>  at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> ~[na:na]
>
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[scala-library-2.11.12.jar:na]
>
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[scala-library-2.11.12.jar:na]
>
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[scala-library-2.11.12.jar:na]
>
>  at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[na:na]
>
>  at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na]
>
>  at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na]
>
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na]
>
>  at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na]
>
>  at 

Re: [DISCUSS] Drop Zookeeper 3.4

2021-12-06 Thread Arvid Heise
Could someone please help me understand the implications of the upgrade?

As far as I understood this upgrade would only affect users that have a
zookeeper shared across multiple services, some of which require ZK 3.4-? A
workaround for those users would be to run two ZKs with different versions,
eventually deprecating old ZK, correct?

If that is the only limitation, I'm +1 for the proposal since ZK 3.4 is
already EOL.

How are K8s users affected?

Best,

Arvid

On Mon, Dec 6, 2021 at 2:00 PM Chesnay Schepler  wrote:

> ping @users; any input on how this would affect you is highly appreciated.
>
> On 25/11/2021 22:39, Chesnay Schepler wrote:
> > I included the user ML in the thread.
> >
> > @users Are you still using Zookeeper 3.4? If so, were you planning to
> > upgrade Zookeeper in the near future?
> >
> > I'm not sure about ZK compatibility, but we'd also upgrade Curator to
> > 5.x, which doesn't support ookeeperK 3.4 anymore.
> >
> > On 25/11/2021 21:56, Till Rohrmann wrote:
> >> Should we ask on the user mailing list whether anybody is still using
> >> ZooKeeper 3.4 and thus needs support for this version or can a ZooKeeper
> >> 3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would expect that
> >> not a
> >> lot of users depend on it but just to make sure that we aren't
> >> annoying a
> >> lot of our users with this change. Apart from that +1 for removing it if
> >> not a lot of user depend on it.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl 
> >> wrote:
> >>
> >>> Thanks for starting this discussion, Chesnay. +1 from my side. It's
> >>> time to
> >>> move forward with the ZK support considering the EOL of 3.4 you already
> >>> mentioned. The benefits we gain from upgrading Curator to 5.x as a
> >>> consequence is another plus point. Just for reference on the
> >>> inconsistent
> >>> state issue you mentioned: FLINK-24543 [1].
> >>>
> >>> Matthias
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-24543
> >>>
> >>> On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler 
> >>> wrote:
> >>>
>  Hello,
> 
>  I'd like to drop support for Zookeeper 3.4 in 1.15, upgrading the
>  default to 3.5 with an opt-in for 3.6.
> 
>  Supporting Zookeeper 3.4 (which is already EOL) prevents us from
>  upgrading Curator to 5.x, which would allow us to properly fix an
>  issue
>  with inconsistent state. It is also required to eventually support ZK
> >>> 3.6.
> >
> >
>
>


Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-12-06 Thread James Sandys-Lumsdaine
Hello again,

We recently upgraded from Flink 1.12.3 to 1.14.0 and we were hoping it would 
solve our issue with checkpointing with finished data sources. We need the 
checkpointing to work to trigger Flink's GenericWriteAheadSink class.

Firstly, the constant mentioned on FLIP-147 that enables the feature isn't 
available as far as we can see (ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH). It's 
not in ConfigConstants or CheckpointConfig for example. So instead we enabled 
with the following:

conf.setBoolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled",
 true);
StreamExecutionEnvironment env = StreamExecutionEnvironment 
.createLocalEnvironmentWithWebUI(config)
env.enableCheckpointing(30 * 1000);
...

We can see the constant available in 1.15 on Google but not the version we were 
expecting (1.14.0).

Previously we had to have long Thread.sleep(x) in to keep the sources alive 
when checkpoints were taken. When we enable this feature using the explicit 
string and removed these hacks we start seeing these errors:


INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.e.ExecutionGraph Source: 
Order JDBC Source (1/1) (e015c4f0910fb27e15fec063616ab785) switched from 
RUNNING to FINISHED.



[some lines removed for brevity]



INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.c.CheckpointCoordinator 
Triggering Checkpoint 5 for job 53d42ae669fad6cc8df2fe8f5706b48d failed due to 
{}

org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a 
checkpoint request for unknown task e015c4f0910fb27e15fec063616ab785. Failure 
reason: Task local checkpoint failure.

 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:966)
 ~[flink-runtime-1.14.0.jar:1.14.0]

 at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) ~[na:na]

 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.8.0_91]

 at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]

 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
 ~[na:na]

 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[na:na]

 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
 ~[na:na]

 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
 ~[na:na]

 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[na:na]

 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[na:na]

 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[na:na]

 at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[scala-library-2.11.12.jar:na]

 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[scala-library-2.11.12.jar:na]

 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[na:na]

 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:na]

 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[scala-library-2.11.12.jar:na]

 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[scala-library-2.11.12.jar:na]

 at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[na:na]

 at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na]

 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na]

 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na]

 at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na]

 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[na:na]

 at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[na:na]

 at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[na:na]

 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
~[na:1.8.0_91]

 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
~[na:1.8.0_91]

 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
~[na:1.8.0_91]

 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 
~[na:1.8.0_91]

FYI, if we don't enable this feature we see a different error consistent with 
the older version of Flink:

INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator   Failed to 
trigger checkpoint for job 532b20d72d407bf82570c86f672ccf2c because Some tasks 
of the job have already finished and checkpointing with finished tasks is not 
enabled. Failure reason: Not all required tasks are currently running.

Can anyone advise if this feature is indeed available and working in 1.14.0 and 
how to correctly enable?

Thanks,

James.


From: Austin Cawley-Edwards 
Sent: 04 November 2021 18:46
To: James Sandys-Lumsdaine 
Cc: 

Re: Table DataStream Conversion Lost Watermark

2021-12-06 Thread Timo Walther

Hi Yunfeng,

it seems this is a deeper issue with the fromValues implementation. 
Under the hood, it still uses the deprecated InputFormat stack. And as 
far as I can see, there we don't emit a final MAX_WATERMARK. I will 
definitely forward this.


But toDataStream forwards watermarks correctly.

I hope this helps. Or do you think we should also rediscuss the 
fromDataStream watermark behavior?


Regards,
Timo


On 06.12.21 10:26, Yunfeng Zhou wrote:

Hi Timo,

Thanks for your response. I encountered another problem that might be 
relevant to the watermark as we discussed above.


In the test cases shown below. I would create a table from some data, 
convert it to datastream and do windowAll().reduce() on it. If we need 
to explicitly specify a `rowtime` metadata column in order to make the 
table pass timestamps to the converted datastream, then both the test 
cases should print out empty lists. In fact, one of them could print out 
a list with some data. The only difference between them is that I 
changed the value of some input data. This behavior can be reproduced 
under Flink ML's latest java environment and configurations.


Is this the expected behavior of `toDataStream`, or I have accidentally 
encountered a bug?


Best regards,
Yunfeng

```java

public class SimpleTest {
 @Test
public void testSimple1()throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table inputTable = tEnv.fromValues(
 DataTypes.ROW(
 DataTypes.FIELD("weight", DataTypes.DOUBLE()),
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.DOUBLE()),
DataTypes.FIELD("f2", DataTypes.DOUBLE()),
DataTypes.FIELD("f3", DataTypes.DOUBLE()),
DataTypes.FIELD("f4", DataTypes.INT()),
DataTypes.FIELD("label", DataTypes.STRING())
 ),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "b", 0., 1., 1., 3, "l1"),
Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
Row.of(1., "a", 1., 1., 0., 1, "l0"),
Row.of(2., "d", 1., 1., 0., 1, "l0")
 );

DataStream input = tEnv.toDataStream(inputTable);

System.out.println(IteratorUtils.toList(input
 .windowAll(EndOfStreamWindows.get())
 .reduce((ReduceFunction) (row, t1) -> row)
 .executeAndCollect()
 ));
}


 @Test
public void testSimple2()throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table inputTable = tEnv.fromValues(
 DataTypes.ROW(
 DataTypes.FIELD("weight", DataTypes.DOUBLE()),
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.DOUBLE()),
DataTypes.FIELD("f2", DataTypes.DOUBLE()),
DataTypes.FIELD("f3", DataTypes.DOUBLE()),
DataTypes.FIELD("f4", DataTypes.INT()),
DataTypes.FIELD("label", DataTypes.STRING())
 ),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "a", 1., 0., 1., 2, "l1"),
Row.of(1., "b", 0., 1., 1., 3, "l1"),
Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
Row.of(1., "a", 1., 1.5, 0., 1, "l0"),
Row.of(2., "d", 1., 1., 0., 1, "l0")
 );

DataStream input = tEnv.toDataStream(inputTable);

System.out.println(IteratorUtils.toList(input
 .windowAll(EndOfStreamWindows.get())
 .reduce((ReduceFunction) (row, t1) -> row)
 .executeAndCollect()
 ));
}
}

```

```java

/**
* A {@link WindowAssigner} that assigns all elements of a bounded input 
stream into one window

* pane. The results are emitted once the input stream has ended.
*/
public class EndOfStreamWindowsextends WindowAssigner {

 private static final EndOfStreamWindowsINSTANCE =new EndOfStreamWindows();

private EndOfStreamWindows() {}

 public static EndOfStreamWindowsget() {
 return INSTANCE;
}

 @Override
public CollectionassignWindows(
 Object element, long timestamp, WindowAssignerContext context) {
 return Collections.singletonList(new TimeWindow(Long.MIN_VALUE, 
Long.MAX_VALUE));
}

 @Override
public TriggergetDefaultTrigger(StreamExecutionEnvironment 
env) {
 return EventTimeTrigger.create();
}

 @Override
public StringtoString() {
 return "EndOfStreamWindows()";
}

 @Override
public TypeSerializergetWindowSerializer(ExecutionConfig 
executionConfig) {
 return new TimeWindow.Serializer();
}

 @Override
public boolean isEventTime() {
 return true;
}
}

```

On Fri, Nov 5, 2021 at 4:29 PM Timo Walther > wrote:


Hi Yunfeng,

by default the fromDataStream does not propagate watermarks into Table
API. Because Table API needs a time attribute in the schema that

flink cdc支持mysql整库同步进hudi湖吗?

2021-12-06 Thread casel.chen
flink cdc支持mysql整库同步进hudi湖吗?如果支持的话,希望能给一个例子,还要求能够支持schema变更。谢谢!

Re:Re: 关于flink on yarn 跨多hdfs集群访问的问题

2021-12-06 Thread casel.chen
如果是两套oss或s3 
bucket(每个bucket对应一组accessKey/secret)要怎么配置呢?例如写数据到bucketA,但checkpoint在bucketB

















在 2021-12-06 18:59:46,"Yang Wang"  写道:
>我觉得你可以尝试一下ship本地的hadoop conf,然后设置HADOOP_CONF_DIR环境变量的方式
>
>-yt /path/of/my-hadoop-conf
>-yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>-yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>
>
>Best,
>Yang
>
>chenqizhu  于2021年11月30日周二 上午10:00写道:
>
>> all,您好:
>>
>>  flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式
>> 配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:
>>
>>
>>
>>
>> flink.hadoop.dfs.nameservices: ACluster,BCluster
>>
>> flink.hadoop.fs.defaultFS: hdfs://BCluster
>>
>>
>>
>>
>> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>>
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
>>
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
>>
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
>>
>> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>>
>>
>>
>> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>>
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
>>
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
>>
>> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>>
>>
>>
>> 但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
>>
>> (将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS:
>> hdfs://ACluster)
>>
>>
>>
>>
>> Caused by: BCluster
>>
>> java.net.UnknownHostException: BCluster
>>
>> at
>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>>
>> at
>> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>>
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>>
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>>
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>>
>> at
>> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>>
>> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>>
>> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>>
>> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>>
>> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>>
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> 对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。
>>
>>
>>
>>
>> 我的组件版本:
>>
>> flink : 1.13.3
>>
>> hadoop : 3.3.0
>>
>>
>>
>>
>> 期待回复,感谢!


Re: enable.auto.commit=true and checkpointing turned on

2021-12-06 Thread Vishal Santoshi
perfect. Thanks. That is what I imagined.

On Mon, Dec 6, 2021 at 2:04 AM Hang Ruan  wrote:

> Hi,
>
> 1. Yes, the kafka source will use the Kafka committed offset for the group
> id to start the job.
>
> 2. No, the auto.offset.reset
>   
> is
> from Kafka consumer config, which defines what to do when there is no
> initial offset in Kafka or if the current offset does not exist any more on
> the server. If the offset exists on the server, the consumer will still
> start from the committed offset.
>
> ps: If you enabled checkpointing, there is no need to enable
> enable.auto.commit. The offset will be committed to Kafka when checkpoints
> complete, which is the default behavior.
>
> Vishal Santoshi  于2021年12月4日周六 02:11写道:
>
>> Hello folks,
>>
>> 2 questions
>>  1. If we have enabled enable.auto.commit and enabled checkpointing and
>> we restart a flink application ( without checkpoint or savepoint ) , would
>> the kafka consumer start consuming from the last offset committed to kafka.
>>
>> 2. What if in the above scenario, we have "auto.offset.reset" set to
>> "latest". ? Would that ignore the consumer group offset in kafka ?
>>
>>
>> Regards.
>>
>


Re: [DISCUSS] Drop Zookeeper 3.4

2021-12-06 Thread Chesnay Schepler

ping @users; any input on how this would affect you is highly appreciated.

On 25/11/2021 22:39, Chesnay Schepler wrote:

I included the user ML in the thread.

@users Are you still using Zookeeper 3.4? If so, were you planning to 
upgrade Zookeeper in the near future?


I'm not sure about ZK compatibility, but we'd also upgrade Curator to 
5.x, which doesn't support ookeeperK 3.4 anymore.


On 25/11/2021 21:56, Till Rohrmann wrote:

Should we ask on the user mailing list whether anybody is still using
ZooKeeper 3.4 and thus needs support for this version or can a ZooKeeper
3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would expect that 
not a
lot of users depend on it but just to make sure that we aren't 
annoying a

lot of our users with this change. Apart from that +1 for removing it if
not a lot of user depend on it.

Cheers,
Till

On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl 
wrote:

Thanks for starting this discussion, Chesnay. +1 from my side. It's 
time to

move forward with the ZK support considering the EOL of 3.4 you already
mentioned. The benefits we gain from upgrading Curator to 5.x as a
consequence is another plus point. Just for reference on the 
inconsistent

state issue you mentioned: FLINK-24543 [1].

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-24543

On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler 
wrote:


Hello,

I'd like to drop support for Zookeeper 3.4 in 1.15, upgrading the
default to 3.5 with an opt-in for 3.6.

Supporting Zookeeper 3.4 (which is already EOL) prevents us from
upgrading Curator to 5.x, which would allow us to properly fix an 
issue

with inconsistent state. It is also required to eventually support ZK

3.6.







Re: use of Scala versions >= 2.13 in Flink 1.15

2021-12-06 Thread Chesnay Schepler
With regards to the Java APIs, you will definitely be able to use the 
Java DataSet/DataStream APIs from Scala without any restrictions imposed 
by Flink. This is already working with the current SNAPSHOT version.


As we speak we are also working to achieve the same for the Table API; 
we expect to achieve that but with some caveats (i.e., if you use the 
Python API or the Hive connector then you still need to use the Scala 
version provided by Flink).


As for the Scala APIs, we haven't really decided yet how this will work 
in the future. However, one of the big benefits of the Scala-free 
runtime is that it should now be easier for us to release the APIs for 
more Scala versions.


On 06/12/2021 11:47, guenterh.lists wrote:

Dear list,

there have been some discussions and activities in the last months 
about a Scala free runtime which should make it possible to use newer 
Scala version (>= 2.13 / 3.x) on the application side.


Stephan Ewen announced the implementation is on the way [1] and 
Martijn Vissr mentioned in the ask me anything session on version 1.14 
that it is planned to make this possible in the upcoming 1.15 version 
(~ next February ) [2]


This would be very nice for our currently started project where we are 
discussing the used tools and infrastructure. "Personally" I would 
prefer that people with less experience on the JVM could make their 
start and first experiences with a "pythonized" Scala using the last 
versions of the language (2.13.x or maybe 3.x).


My question: Do you think your plans to provide the possibility of a 
Scala free runtime with the upcoming version is still realistic?


Out of curiosity: If you can make this possible and applications with 
current Scala versions are going to use the Java APIs of Flink what's 
the future of the current Scala API of Flink where you have to decide 
to use either Scala 2.11 or <2.12.8?

Is this then still possible as an alternative?

Thanks for some hints for our planning and decisions

Günter




[1] https://twitter.com/data_fly/status/1415012793347149830
[2] https://www.youtube.com/watch?v=wODmlow0ip0





Re: Re: Re: how to run streaming process after batch process is completed?

2021-12-06 Thread Yun Gao
Hi Joern,

Very thanks for sharing the cases! Could you also share a bit more on the 
detailed scenarios~?

Best,
Yun



 --Original Mail --
Sender:Joern Kottmann 
Send Date:Fri Dec 3 16:43:38 2021
Recipients:Yun Gao 
CC:vtygoss , Alexander Preuß , 
user@flink.apache.org 
Subject:Re: Re: how to run streaming process after batch process is completed?

Hello,

Are there plans to support checkpoints for batch mode? I currently load the 
state back via the DataStream API, but this gets more and more complicated and 
doesn't always lead to a perfect state restore (as flink could have done). 

This is one of my most wanted Flink features these days.

Regards,
Jörn



On Thu, Dec 2, 2021 at 9:24 AM Yun Gao  wrote:

Hi Vtygoss,

Very thanks for sharing the scenarios!

Currently for batch mode checkpoint is not support, thus it could not
create a snapshot after the job is finished. However, there might be some
alternative solutions:

1. Hybrid source [1] targets at allowing first read from a bounded source, then 
switch
to an unbounded source, which seems to work in this case. however, currently it 
might not
support the table / sql yet, which might be done in 1.15. 
2. The batch job might first write the result to an intermediate table, then 
for the unbounded
stream job, it might first load the table into state with DataStream API on 
startup or use dimension
join to continue processing new records. 

Best,
Yun

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
 --Original Mail --
Sender:vtygoss 
Send Date:Wed Dec 1 17:52:17 2021
Recipients:Alexander Preuß 
CC:user@flink.apache.org 
Subject:Re: how to run streaming process after batch process is completed?

Hi Alexander,

This is my ideal data pipeline. 
- 1. Sqoop transfer bounded data from database to hive. And I think flink batch 
process is more efficient than streaming process, so i want to process this 
bounded data in batch mode and write result in HiveTable2. 
- 2. There ares some tools to transfer CDC / BINLOG to kafka, and to write 
incremental unbounded data in HiveTable1.  I want to process this unbounded 
data in streaming mode and update incremental result in HiveTable2. 

So this is the problem. The flink streaming sql application cannot be restored 
from  batch process application. e.g. SQL: insert into table_2 select count(1) 
from table_1. In batch mode, the result stored in table_2 is N. And i expect 
that the accumulator number starts from N, not 0 when streaming process started.

Thanks for your reply. 

Best Regard!

(sending again because I accidentally left out the user ml in the reply on the 
first try)...
在 2021年11月30日 21:42,Alexander Preuß 写道:
Hi Vtygoss, 

Can you explain a bit more about your ideal pipeline? Is the batch data bounded 
data or could you also process it in streaming execution mode? And is the 
streaming data derived from the batch data or do you just want to ensure that 
the batch has been finished before running the processing of the streaming data?

Best Regards,
Alexander

(sending again because I accidentally left out the user ml in the reply on the 
first try)
On Tue, Nov 30, 2021 at 12:38 PM vtygoss  wrote:

Hi, community!

By Flink, I want to unify batch process and streaming process in data 
production pipeline. Batch process is used to process inventory data, then 
streaming process is used to process incremental data. But I meet a problem, 
there is no  state in batch and the result is error if i run stream process 
directly. 

So how to run streaming process accurately  after batch process is completed?   
Is there any doc or demo to handle this scenario?

Thanks for your any reply or suggestion!

Best Regards!






[DISCUSS] Strong read-after-write consistency of Flink FileSystems

2021-12-06 Thread David Morávek
Hi Everyone,

as outlined in FLIP-194 discussion [1], for the future directions of Flink
HA services, I'd like to verify my thoughts around guarantees of the
distributed filesystems used with Flink.

Currently some of the services (*JobGraphStore*, *CompletedCheckpointStore*)
are implemented using a combination of strongly consistent Metadata storage
(ZooKeeper, K8s CM) and the actual FileSystem. Reasoning behind this dates
back to days, when S3 was an eventually consistent FileSystem and we needed
a strongly consistent view of the data.

I did some research, and my feeling is that all the major FileSystems that
Flink supports already provide strong read-after-write consistency, which
would be sufficient to decrease a complexity of the current HA
implementations.

FileSystems that I've checked and that seem to support strong
read-after-write consistency:
- S3
- GCS
- Azure Blob Storage
- Aliyun OSS
- HDFS
- Minio

Are you aware of other FileSystems that are used with Flink? Do they
support the consistency that is required for starting a new initiatives
towards simpler / less error-prone HA services? Are you aware of any
problems with the above mentioned FileSystems that I might have missed?

I'm also bringing this up to user@f.a.o, to make sure we don't miss any
FileSystems.

[1] https://lists.apache.org/thread/wlzv02jqtq221kb8dnm82v4xj8tomd94

Best,
D.


Re: 关于flink on yarn 跨多hdfs集群访问的问题

2021-12-06 Thread Yang Wang
我觉得你可以尝试一下ship本地的hadoop conf,然后设置HADOOP_CONF_DIR环境变量的方式

-yt /path/of/my-hadoop-conf
-yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
-yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'


Best,
Yang

chenqizhu  于2021年11月30日周二 上午10:00写道:

> all,您好:
>
>  flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式
> 配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:
>
>
>
>
> flink.hadoop.dfs.nameservices: ACluster,BCluster
>
> flink.hadoop.fs.defaultFS: hdfs://BCluster
>
>
>
>
> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
>
> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
>
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
>
> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
>
>
>
> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
>
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
>
> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
>
>
>
> 但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
>
> (将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS:
> hdfs://ACluster)
>
>
>
>
> Caused by: BCluster
>
> java.net.UnknownHostException: BCluster
>
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>
> at
> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>
> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。
>
>
>
>
> 我的组件版本:
>
> flink : 1.13.3
>
> hadoop : 3.3.0
>
>
>
>
> 期待回复,感谢!


use of Scala versions >= 2.13 in Flink 1.15

2021-12-06 Thread guenterh.lists

Dear list,

there have been some discussions and activities in the last months about 
a Scala free runtime which should make it possible to use newer Scala 
version (>= 2.13 / 3.x) on the application side.


Stephan Ewen announced the implementation is on the way [1] and Martijn 
Vissr mentioned in the ask me anything session on version 1.14 that it 
is planned to make this possible in the upcoming 1.15 version (~ next 
February ) [2]


This would be very nice for our currently started project where we are 
discussing the used tools and infrastructure. "Personally" I would 
prefer that people with less experience on the JVM could make their 
start and first experiences with a "pythonized" Scala using the last 
versions of the language (2.13.x or maybe 3.x).


My question: Do you think your plans to provide the possibility of a 
Scala free runtime with the upcoming version is still realistic?


Out of curiosity: If you can make this possible and applications with 
current Scala versions are going to use the Java APIs of Flink what's 
the future of the current Scala API of Flink where you have to decide to 
use either Scala 2.11 or <2.12.8?

Is this then still possible as an alternative?

Thanks for some hints for our planning and decisions

Günter




[1] https://twitter.com/data_fly/status/1415012793347149830
[2] https://www.youtube.com/watch?v=wODmlow0ip0

--
Günter Hipler
university library Leipzig



Converting DataStream of Avro SpecificRecord to Table

2021-12-06 Thread Dongwon Kim
Hi community,

I'm currently converting a DataStream of Avro SpecificRecord type into
Table using the following method:

public static  Table
toTable(StreamTableEnvironment tEnv,
   DataStream dataStream,
   Class cls) {
  RichMapFunction avroSpecific2RowConverter = new RichMapFunction<>() {
private transient AvroSerializationSchema avro2bin = null;
private transient AvroRowDeserializationSchema bin2row = null;

@Override
public void open(Configuration parameters) throws Exception {
  avro2bin = AvroSerializationSchema.forSpecific(cls);
  bin2row = new AvroRowDeserializationSchema(cls);
}

@Override
public Row map(T value) throws Exception {
  byte[] bytes = avro2bin.serialize(value);
  Row row = bin2row.deserialize(bytes);
  return row;
}
  };

  SingleOutputStreamOperator rows =
dataStream.map(avroSpecific2RowConverter)
// https://issues.apache.org/jira/browse/FLINK-23885
.returns(AvroSchemaConverter.convertToTypeInfo(cls));

  return tEnv.fromDataStream(rows);
}

I'm wondering whether there's a pre-defined utility for that or a better
way to do so in Flink-1.14.

Best,

Dongwon


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-06 Thread Yingjie Cao
Hi Till,

Thanks for your feedback.

>>> How will our tests be affected by these changes? Will Flink require
more resources and, thus, will it risk destabilizing our testing
infrastructure?

There are some tests that need to be adjusted, for example,
BlockingShuffleITCase. For other tests, theoretically, the influence should
be small. I will further run all tests multiple times (like 10 or 20) to
ensure that there is no test stability issues before making the change.

>>> I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Yes, you are right. I will prepare a simple FLIP soon.

Best,
Yingjie


Till Rohrmann  于2021年12月3日周五 18:39写道:

> Thanks for starting this discussion Yingjie,
>
> How will our tests be affected by these changes? Will Flink require more
> resources and, thus, will it risk destabilizing our testing infrastructure?
>
> I would propose to create a FLIP for these changes since you propose to
> change the default behaviour. It can be a very short one, though.
>
> Cheers,
> Till
>
> On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao 
> wrote:
>
>> Hi dev & users,
>>
>> We propose to change some default values of blocking shuffle to improve
>> the user out-of-box experience (not influence streaming). The default
>> values we want to change are as follows:
>>
>> 1. Data compression
>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>> default value is 'false'.  Usually, data compression can reduce both disk
>> and network IO which is good for performance. At the same time, it can save
>> storage space. We propose to change the default value to true.
>>
>> 2. Default shuffle implementation
>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>> both stability and performance. So we propose to reduce the default value
>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>> 1024 with a tpc-ds and 128 is the best one.)
>>
>> 3. Read buffer of sort-shuffle
>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>> default value is '32M'. Previously, when choosing the default value, both
>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>> way. However, recently, it is reported in the mailing list that the default
>> value is not enough which caused a buffer request timeout issue. We already
>> created a ticket to improve the behavior. At the same time, we propose to
>> increase this default value to '64M' which can also help.
>>
>> 4. Sort buffer size of sort-shuffle
>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>> value is '64' which means '64' network buffers (32k per buffer by default).
>> This default value is quite modest and the performance can be influenced.
>> We propose to increase this value to a larger one, for example, 512 (the
>> default TM and network buffer configuration can serve more than 10
>> result partitions concurrently).
>>
>> We already tested these default values together with tpc-ds benchmark in
>> a cluster and both the performance and stability improved a lot. These
>> changes can help to improve the out-of-box experience of blocking shuffle.
>> What do you think about these changes? Is there any concern? If there are
>> no objections, I will make these changes soon.
>>
>> Best,
>> Yingjie
>>
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-06 Thread Yingjie Cao
Hi Till,

Thanks for your feedback.

>>> How will our tests be affected by these changes? Will Flink require
more resources and, thus, will it risk destabilizing our testing
infrastructure?

There are some tests that need to be adjusted, for example,
BlockingShuffleITCase. For other tests, theoretically, the influence should
be small. I will further run all tests multiple times (like 10 or 20) to
ensure that there is no test stability issues before making the change.

>>> I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Yes, you are right. I will prepare a simple FLIP soon.

Best,
Yingjie


Till Rohrmann  于2021年12月3日周五 18:39写道:

> Thanks for starting this discussion Yingjie,
>
> How will our tests be affected by these changes? Will Flink require more
> resources and, thus, will it risk destabilizing our testing infrastructure?
>
> I would propose to create a FLIP for these changes since you propose to
> change the default behaviour. It can be a very short one, though.
>
> Cheers,
> Till
>
> On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao 
> wrote:
>
>> Hi dev & users,
>>
>> We propose to change some default values of blocking shuffle to improve
>> the user out-of-box experience (not influence streaming). The default
>> values we want to change are as follows:
>>
>> 1. Data compression
>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>> default value is 'false'.  Usually, data compression can reduce both disk
>> and network IO which is good for performance. At the same time, it can save
>> storage space. We propose to change the default value to true.
>>
>> 2. Default shuffle implementation
>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>> both stability and performance. So we propose to reduce the default value
>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>> 1024 with a tpc-ds and 128 is the best one.)
>>
>> 3. Read buffer of sort-shuffle
>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>> default value is '32M'. Previously, when choosing the default value, both
>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>> way. However, recently, it is reported in the mailing list that the default
>> value is not enough which caused a buffer request timeout issue. We already
>> created a ticket to improve the behavior. At the same time, we propose to
>> increase this default value to '64M' which can also help.
>>
>> 4. Sort buffer size of sort-shuffle
>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>> value is '64' which means '64' network buffers (32k per buffer by default).
>> This default value is quite modest and the performance can be influenced.
>> We propose to increase this value to a larger one, for example, 512 (the
>> default TM and network buffer configuration can serve more than 10
>> result partitions concurrently).
>>
>> We already tested these default values together with tpc-ds benchmark in
>> a cluster and both the performance and stability improved a lot. These
>> changes can help to improve the out-of-box experience of blocking shuffle.
>> What do you think about these changes? Is there any concern? If there are
>> no objections, I will make these changes soon.
>>
>> Best,
>> Yingjie
>>
>


Re: Order of events in Broadcast State

2021-12-06 Thread David Anderson
Event ordering in Flink is only maintained between pairs of events that
take exactly the same path through the execution graph. So if you
have multiple instances of A (let's call them A1 and A2), each broadcasting
a partition of the total rule space, then one instance of B (B1) might
receive rule1 from A1 before rule2 from A2, while B2 might receive rule2
before rule1.

If it fits your needs, one simple way to avoid having problems with this is
to broadcast from a task with a parallelism of 1. Then every downstream
instance will receive the broadcast stream in the same order.

David

On Sat, Dec 4, 2021 at 2:45 AM Alexey Trenikhun  wrote:

> [1] -
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/
> The Broadcast State Pattern | Apache Flink
> 
> The Broadcast State Pattern # In this section you will learn about how to
> use broadcast state in practise. Please refer to Stateful Stream Processing
> to learn about the concepts behind stateful stream processing. Provided
> APIs # To show the provided APIs, we will start with an example before
> presenting their full functionality. As our running example, we will use
> the case where we have a ...
> nightlies.apache.org
>
>
> --
> *From:* Alexey Trenikhun 
> *Sent:* Friday, December 3, 2021 4:33 PM
> *To:* Flink User Mail List 
> *Subject:* Order of events in Broadcast State
>
> Hello,
> Trying to understand what statement "Order of events in Broadcast State
> may differ across tasks" in [1] means.
> Let's say I have keyed function "A" which broadcasting stream of rules,
> KeyedBroadcastProcessFunction  "B" receives rules and updates broadcast
> state, like example in [1]. Let's say "A" broadcasts "rule 1" with name X,
> then "A" (same key) broadcasts "rule 2" with same name X, is there
> guarantee that eventually broadcast state will contain "rule 2" or since
> there is no ordering, B could receive "rule 2", then "rule 1" and broadcast
> state will end up with {X="rule 1"} forever ?
>
> Thanks,
> Alexey
>


Re: GCS/Object Storage Rate Limiting

2021-12-06 Thread David Morávek
Hi Kevin,

Flink comes with two schedulers for streaming:
- Default
- Adaptive (opt-in)

Adaptive is still in experimental phase and doesn't support local recover.
You're most likely using the first one, so you should be OK.

Can you elaborate on this a bit? We aren't changing the parallelism when
> restoring.
>

Splitting / merging of the rocksdb based operator checkpoint is currently
an expensive operation. If the parallelism remains unchanged, you should be
OK, the majority of time for the operator state restore will be spend on
download of the rocksdb snapshot.

Our checkpoint is about 900GB, and we have 256 TaskManagers with a
> parallelism of 512.
>

This could definitely generate lot of concurrent requests when restoring
the state.

Does the restore operation fail, or the retry mechanism is sufficient to
work around this?

D.

On Thu, Dec 2, 2021 at 7:54 PM Kevin Lam  wrote:

> HI David,
>
> Thanks for your response.
>
> What's the DefaultScheduler you're referring to? Is that available in
> Flink 1.13.1 (the version we are using)?
>
> How large is the state you're restoring from / how many TMs does the job
>> consume / what is the parallelism?
>
>
> Our checkpoint is about 900GB, and we have 256 TaskManagers with a
> parallelism of 512.
>
> Also things could get even worse if the parallelism that has been used for
>> taking the checkpoint is different from the one you're trying to restore
>> with (especially with RocksDB).
>>
>
> Can you elaborate on this a bit? We aren't changing the parallelism when
> restoring.
>
> On Thu, Dec 2, 2021 at 10:48 AM David Morávek  wrote:
>
>> Hi Kevin,
>>
>> this happens only when the pipeline is started up from savepoint /
>> retained checkpoint right? Guessing from the "path" you've shared it seems
>> like a RockDB based retained checkpoint. In this case all task managers
>> need to pull state files from the object storage in order to restore. This
>> can indeed be a heavy operation especially when restore a large state with
>> high parallelism.
>>
>> Recovery from failure should be faster (with DefaultScheduler) as we can
>> re-use the local files that are already present on TaskManagers.
>>
>> How large is the state you're restoring from / how many TMs does the job
>> consume / what is the parallelism?
>>
>> Also things could get even worse if the parallelism that has been used
>> for taking the checkpoint is different from the one you're trying to
>> restore with (especially with RocksDB).
>>
>> Best,
>> D.
>>
>> On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam  wrote:
>>
>>> Hi all,
>>>
>>> We're running a large (256 task managers with 4 task slots each) Flink
>>> Cluster with High Availability enabled, on Kubernetes, and use Google Cloud
>>> Storage (GCS) as our object storage for the HA metadata. In addition, our
>>> Flink application writes out to GCS from one of its sinks via streaming
>>> file sink + GCS connector.
>>>
>>> We observed the following types of errors when running our application:
>>>
>>> ```
>>>
>>> INFO: Encountered status code 429 when sending GET request to URL '
>>> https://storage.googleapis.com/download/storage/v1/b//o/checkpoints%2F%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media=1638448883568946'.
>>> Delegating to response handler for possible retry. [CONTEXT
>>> ratelimit_period="10 SECONDS [skipped: 8]" ]
>>>
>>> ```
>>>
>>> ```
>>>  INFO: Encountered status code 503 when sending POST request to URL '
>>> https://storage.googleapis.com/upload/storage/v1/b//o?uploadType=multipart'.
>>> Delegating to response handler for possible retry.
>>> ```
>>>
>>> They typically happen upon cluster start-up, when all the task managers
>>> are registering with the jobmanager. We've also seen them occur as a result
>>> of output from our sink operator as well.
>>>
>>> Has anyone else encountered similar issues? Any practices you can
>>> suggest?
>>>
>>> Advice appreciated!
>>>
>>> Thanks
>>>
>>


Re: Unable to create new native thread error

2021-12-06 Thread David Morávek
Hi Ilan,

I think so, using CLI instead of REST API should solve this, as the user
code execution would be pulled out to a separate JVM. If you're going to
try that, it would be great to hear back whether it has solved your issue.

As for 1.13.4, there is currently no on-going effort / concrete plan on the
release.

Best,
D.

On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky 
wrote:

> Hi David,
>
>
>
> Thanks for your fast response.
>
>
>
> Do you think that changing the submission method could solve the problem?
> Using the CLI instead of the REST API.
>
>
>
> Another question, I see that the most critical issue (FLINK-25022) is in
> progress and should be released on with version 1.13.4 , do you know when
> this version is planned to be released?
>
>
>
> Thanks again,
>
> Ilan.
>
>
>
> *From: *David Morávek 
> *Date: *Thursday, 2 December 2021 at 17:25
> *To: *Ilan Huchansky 
> *Cc: *user@flink.apache.org , Start.io SDP <
> s...@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan,
>
>
>
> we are aware of multiple issues when web-submission can result in
> classloader / thread local leaks, which could potentially result in the
> behavior you're describing. We're working on addressing them.
>
>
>
> FLINK-25022 [1]: The most critical one leaking thread locals.
> FLINK-25027 [2]: Is only a memory improvement for a particular situation
> (a lot of small batch jobs) and could be fixed by accounting for when
> setting Metaspace size.
> FLINK-25023 [3]: Can leak the classloader of the first job submitted via
> rest API. (constant overhead for Metaspace)
>
>
>
> In general, web-submission is different from a normal submission in way,
> that the "main method" of the uploaded jar is executed on JobManager and
> it's really hard to isolate it's execution from possible side effects.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-25022
>
> [2] https://issues.apache.org/jira/browse/FLINK-25027
>
> [3] https://issues.apache.org/jira/browse/FLINK-25023
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky 
> wrote:
>
> *Hi Flink mailing list,*
>
>
>
> I am Ilan from Start.io data platform team, need some guidance.
>
>
>
> We have a flow with the following use case:
>
>
>
>- We read files from AWS S3 buckets process them on our cluster and
>sink the data into files using Flink file sink.
>- The jobs use always the same jar, we uploaded it to every job
>manager on the cluster.
>- We are submitting jobs constantly through the REST API.
>- Each job reads one or more files from S3.
>- The jobs can run from 20 seconds up to 3.5 hours.
>- The jobs run on batch mode
>- Running flink 1.13.1
>- We are running in cluster mode using docker, same machines are being
>used for task and job manager.
>
>
>
>  We are struggling with the same error, over and over again. We encounter
> it in the job manager and in the task manager.
>
>
>
> After a while that the cluster is running and jobs are finishing correctly
> the task and job manager fail to operate due to:
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
>
>
>
>
> We also see some sporadic failure of java.lang.NoClassDefFoundError, not
> sure it is related.
>
>
>
> Our set up and configuration are as follow:
>
> · 5 nodes cluster running on docker
>
> · Relevant memory config:
>
> jobmanager.memory.heap.size: 1600m
>
> taskmanager.memory.process.size: 231664m
>
> taskmanager.memory.network.fraction: 0.3
>
> taskmanager.memory.jvm-metaspace.size: 10g
>
> jobmanager.memory.jvm-metaspace.size: 2g
>
> taskmanager.memory.framework.off-heap.size: 1g
>
>
>
> · Host details
>
> max locked memory  (kbytes, -l) 65536
>
> max memory size   (kbytes, -m) unlimited
>
> open files (-n) 1024
>
> max user processes(-u) 1547269
>
> virtual memory   (kbytes, -v) unlimited
>
> file locks   (-x) unlimited
>
>
>
> cat /proc/sys/kernel/threads-max: 3094538
>
> kernel.pid_max = 57344
>
>
>
>
>
> We try to increase the max user processes, also to increase and decrease
> the jvm-metaspace.
>
>
>
> Should we keep increasing the max number of processes on the host, Is
> there a way to limit the number of threads from flink config?
>
>
>
> What should we do? Any insights?
> I can provide more information as needed.
>
>
>
> Thanks in advance
>
>
>
>  Ilan
>
>
>
>


????

2021-12-06 Thread lorthevan


Re: 退订

2021-12-06 Thread liber xue
退订

™薇维苿尉℃ 于2021年12月6日 周一16:20写道:

> 退订


????

2021-12-06 Thread ?6?4??????????


????

2021-12-06 Thread ?6?4??????????


Re: flink结合历史数据怎么处理

2021-12-06 Thread Leonard Xu
MySQL CDC connector 
支持并发读取的,读取过程也不会用锁,600万的数据量很小了,百亿级的分库分表我们和社区用户测试下都是ok的,你可以自己试试。

祝好,
Leonard


> 2021年12月6日 下午3:54,张阳 <705503...@qq.com.INVALID> 写道:
> 
> 因为数据量有600w 所以担心初始化时间太长 或者性能问题
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年12月6日(星期一) 下午2:38
> 收件人:"user-zh" 
> 主题:Re: flink结合历史数据怎么处理
> 
> 
> 
> 如果你的数据源是 数据库,你可以尝试下 Flink CDC Connectors[1], 这些Connector 就是 hybrid source, 
> 先读历史全量数据,再读增量数据,
> 历史和增量阶段是无缝衔接的。
> 
> 祝好,
> Leonard 
> [1] 
> https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/mysql-cdc.html
> 
> 
>  2021年12月2日 下午2:40,张阳   
>  统计的指标有大量的历史数据,怎么把历史的数据和今天的实时数据进行汇总呢。