Re: Question about Scala Case Class and List in Flink

2020-01-15 Thread Timo Walther

Hi,

Reg. 1:

Scala case classes are supported in the Scala specific version of the 
DataStream API. If you are using case classes in the Java API you will 
get the INFO below because the Java API uses pure reflection extraction 
for analyzing POJOs.


The Scala API tries to analyze Scala classes first, if this is not 
possible it will fallback to Java reflection extraction. So in your case 
the INFO should not be present because it is a pure Scala case class. Is 
it used within a non-case class?


Reg 2:

Most classes can be serialized by Flink. That's why the log lines are 
just of type INFO because they might affect the performance slightly. If 
you are performance sensitive. I would recommend primtive types, arrays 
and case classes.


Regards,
Timo


On 15.01.20 03:47, Utopia wrote:

Hi folks,

I have two questions about types in Flink when using Scala:

*1. scala case class: *

This my case class define:

case class SensorReading(var id: String , var timestamp: Long, var 
temperature: Double)



In documentation, Scala case class is supported:
`Scala /case classes/(including Scala tuples): null fields not supported`

But the log info shows:
10:26:08,489 INFO org.apache.flink.api.java.typeutils.TypeExtractor - 
class io.github.streamingwithflink.util.SensorReading is missing a 
default constructor so it cannot be used as a POJO type and must be 
processed as GenericType. Please read the Flink documentation on "Data 
Types & Serialization" for details of the effect on performance.



*2. scala list*
*
*
This my case class define:

case class SensorReading(var id:String ,var timestamp: Long,var temperature: 
Double,var list:List[String] =List[String]())

log shows:
No fields were detected for class scala.collection.immutable.List so it 
cannot be used as a POJO type and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for 
details of the effect on performance.


Does it means that scala list can be serialize that I can ignore this 
info if I don’t care the performance.
Should I use Java array list instead of scala list or create a custom 
serializer for SensorReading case class?


Thanks!


Best  regards
Utopia




Filter with large key set

2020-01-15 Thread Jin Yi
Hi there,

I have the following usecase:
a key set say [A,B,C,] with around 10M entries, the type of the entries
can be one of the types in BasicTypeInfo, e.g. String, Long, Integer etc...

and each message looks like below:
message: {
   header: A
   body: {}
}

I would like to use Flink to filter each message' header field, to see if
the value present in the key set.

*The key set needs to be dynamic, meaning at any time, we can perform
add/read/delete operations on the key set. *

Any suggestions are very welcome!

Thanks a lot!
Eleanore


Filter with large key set

2020-01-15 Thread Jin Yi
Hi there,

I have the following usecase:
a key set say [A,B,C,] with around 10M entries, the type of the entries
can be one of the types in BasicTypeInfo, e.g. String, Long, Integer etc...

and each message looks like below:
message: {
   header: A
   body: {}
}

I would like to use Flink to filter each message' header field, to see if
the value present in the key set.

*The key set needs to be dynamic, meaning at any time, we can perform
add/read/delete operations on the key set. *

Any suggestions are very welcome!

Thanks a lot!
Eleanore


Re: Re: MiniCluster问题

2020-01-15 Thread tison
是的,MiniCluster 会在同一个进程里起 JM TM,是一个主要用于测试的集群

standalone 的意思是没有接 YARN 这种资源管理框架,TM 由用户自己手动起,是一个可用于生产的集群

Best,
tison.


郑 洁锋  于2020年1月16日周四 下午2:39写道:

> 我是不是可以理解为是local的cluster,本身会去启动一个flink cluster(当前服务器模拟分布式环境),无需单独部署一个flink集群
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison
> 发送时间: 2020-01-16 14:29
> 收件人: user-zh
> 主题: Re: Re: MiniCluster问题
> 你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
> 是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
> start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月16日周四 下午2:27写道:
>
> > 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
> >
> > 
> > zjfpla...@hotmail.com
> >
> > 发件人: 郑 洁锋
> > 发送时间: 2020-01-16 14:24
> > 收件人: user-zh
> > 主题: Re: Re: MiniCluster问题
> > 这是完整的到启动的代码
> >
> > public class ClusterClientFactory {
> >
> > public static ClusterClient createClusterClient(Options
> > launcherOptions) throws Exception {
> > String mode = launcherOptions.getMode();
> > if(mode.equals(ClusterMode.standalone.name())) {
> > return createStandaloneClient(launcherOptions);
> > } else if(mode.equals(ClusterMode.yarn.name())) {
> > return createYarnClient(launcherOptions,mode);
> > }
> > throw new IllegalArgumentException("Unsupported cluster client
> > type: ");
> > }
> >
> > public static ClusterClient createStandaloneClient(Options
> > launcherOptions) throws Exception {
> > String flinkConfDir = launcherOptions.getFlinkconf();
> > Configuration config =
> > GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> > MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> > miniCluster);
> > LeaderConnectionInfo connectionInfo =
> > clusterClient.getClusterConnectionInfo();
> > InetSocketAddress address =
> > AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
> > config.setString(JobManagerOptions.ADDRESS,
> > address.getAddress().getHostName());
> > config.setInteger(JobManagerOptions.PORT, address.getPort());
> > clusterClient.setDetached(true);
> > return clusterClient;
> > }
> >
> >
> > 启动类中:
> >
> > ClusterClient clusterClient =
> > ClusterClientFactory.createClusterClient(launcherOptions);
> > clusterClient.run(program, 1);
> > clusterClient.shutdown();
> >
> > 
> > zjfpla...@hotmail.com
> >
> > 发件人: tison
> > 发送时间: 2020-01-16 13:31
> > 收件人: user-zh
> > 主题: Re: Re: MiniCluster问题
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> >
> > miniCluster.start();
> >
> >
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> > miniCluster)
> > ;
> >
> > Best,
> > tison.
> >
> >
> > tison  于2020年1月16日周四 下午1:30写道:
> >
> > > 跟集群无关
> > > Best,
> > > tison.
> > >
> > >
> > > tison  于2020年1月16日周四 下午1:30写道:
> > >
> > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> > >>
> > >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >>
> > >> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
> > >>
> > >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> > >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> > >>>
> > >>>
> > >>> 
> > >>> zjfpla...@hotmail.com
> > >>>
> > >>> 发件人: tison
> > >>> 发送时间: 2020-01-16 12:39
> > >>> 收件人: user-zh
> > >>> 主题: Re: MiniCluster问题
> > >>> 你 MiniCluster 要 start 啊(x
> > >>>
> > >>> Best,
> > >>> tison.
> > >>>
> > >>>
> > >>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
> > >>>
> > >>> > MiniCluster代码执行过程中报错:
> > >>> >
> > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > >>> further details.
> > >>> > Exception in thread "main" java.lang.IllegalStateException:
> > >>> MiniCluster is not yet running.
> > >>> > at
> > >>>
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> > >>> > at
> > >>>
> >
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> > >>> > at
> > >>>
> >
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> > >>> > at
> > >>>
> >
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> > >>> > at
> > >>>
> >
> 

Re: Re: MiniCluster问题

2020-01-15 Thread 郑 洁锋
我是不是可以理解为是local的cluster,本身会去启动一个flink cluster(当前服务器模拟分布式环境),无需单独部署一个flink集群


zjfpla...@hotmail.com

发件人: tison
发送时间: 2020-01-16 14:29
收件人: user-zh
主题: Re: Re: MiniCluster问题
你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。

Best,
tison.


郑 洁锋  于2020年1月16日周四 下午2:27写道:

> 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
>
> 
> zjfpla...@hotmail.com
>
> 发件人: 郑 洁锋
> 发送时间: 2020-01-16 14:24
> 收件人: user-zh
> 主题: Re: Re: MiniCluster问题
> 这是完整的到启动的代码
>
> public class ClusterClientFactory {
>
> public static ClusterClient createClusterClient(Options
> launcherOptions) throws Exception {
> String mode = launcherOptions.getMode();
> if(mode.equals(ClusterMode.standalone.name())) {
> return createStandaloneClient(launcherOptions);
> } else if(mode.equals(ClusterMode.yarn.name())) {
> return createYarnClient(launcherOptions,mode);
> }
> throw new IllegalArgumentException("Unsupported cluster client
> type: ");
> }
>
> public static ClusterClient createStandaloneClient(Options
> launcherOptions) throws Exception {
> String flinkConfDir = launcherOptions.getFlinkconf();
> Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> configBuilder.setConfiguration(config);
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> LeaderConnectionInfo connectionInfo =
> clusterClient.getClusterConnectionInfo();
> InetSocketAddress address =
> AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
> config.setString(JobManagerOptions.ADDRESS,
> address.getAddress().getHostName());
> config.setInteger(JobManagerOptions.PORT, address.getPort());
> clusterClient.setDetached(true);
> return clusterClient;
> }
>
>
> 启动类中:
>
> ClusterClient clusterClient =
> ClusterClientFactory.createClusterClient(launcherOptions);
> clusterClient.run(program, 1);
> clusterClient.shutdown();
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison
> 发送时间: 2020-01-16 13:31
> 收件人: user-zh
> 主题: Re: Re: MiniCluster问题
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>
> miniCluster.start();
>
>
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster)
> ;
>
> Best,
> tison.
>
>
> tison  于2020年1月16日周四 下午1:30写道:
>
> > 跟集群无关
> > Best,
> > tison.
> >
> >
> > tison  于2020年1月16日周四 下午1:30写道:
> >
> >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> >>
> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
> >>
> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> >>>
> >>>
> >>> 
> >>> zjfpla...@hotmail.com
> >>>
> >>> 发件人: tison
> >>> 发送时间: 2020-01-16 12:39
> >>> 收件人: user-zh
> >>> 主题: Re: MiniCluster问题
> >>> 你 MiniCluster 要 start 啊(x
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
> >>>
> >>> > MiniCluster代码执行过程中报错:
> >>> >
> >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> >>> further details.
> >>> > Exception in thread "main" java.lang.IllegalStateException:
> >>> MiniCluster is not yet running.
> >>> > at
> >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >>> > at
> >>>
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >>> > at
> >>>
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> >>> > at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >>> > at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >>> > at
> >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >>> >
> >>> > 报错段代码如下:
> >>> >
> >>> > Configuration config =
> >>> GlobalConfiguration.loadConfiguration(flinkConfDir);
> >>> > MiniClusterConfiguration.Builder configBuilder = new
> >>> MiniClusterConfiguration.Builder();
> >>> > configBuilder.setConfiguration(config);
> >>> > 

Re: Re: MiniCluster问题

2020-01-15 Thread tison
你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。

Best,
tison.


郑 洁锋  于2020年1月16日周四 下午2:27写道:

> 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
>
> 
> zjfpla...@hotmail.com
>
> 发件人: 郑 洁锋
> 发送时间: 2020-01-16 14:24
> 收件人: user-zh
> 主题: Re: Re: MiniCluster问题
> 这是完整的到启动的代码
>
> public class ClusterClientFactory {
>
> public static ClusterClient createClusterClient(Options
> launcherOptions) throws Exception {
> String mode = launcherOptions.getMode();
> if(mode.equals(ClusterMode.standalone.name())) {
> return createStandaloneClient(launcherOptions);
> } else if(mode.equals(ClusterMode.yarn.name())) {
> return createYarnClient(launcherOptions,mode);
> }
> throw new IllegalArgumentException("Unsupported cluster client
> type: ");
> }
>
> public static ClusterClient createStandaloneClient(Options
> launcherOptions) throws Exception {
> String flinkConfDir = launcherOptions.getFlinkconf();
> Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> configBuilder.setConfiguration(config);
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> LeaderConnectionInfo connectionInfo =
> clusterClient.getClusterConnectionInfo();
> InetSocketAddress address =
> AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
> config.setString(JobManagerOptions.ADDRESS,
> address.getAddress().getHostName());
> config.setInteger(JobManagerOptions.PORT, address.getPort());
> clusterClient.setDetached(true);
> return clusterClient;
> }
>
>
> 启动类中:
>
> ClusterClient clusterClient =
> ClusterClientFactory.createClusterClient(launcherOptions);
> clusterClient.run(program, 1);
> clusterClient.shutdown();
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison
> 发送时间: 2020-01-16 13:31
> 收件人: user-zh
> 主题: Re: Re: MiniCluster问题
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>
> miniCluster.start();
>
>
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster)
> ;
>
> Best,
> tison.
>
>
> tison  于2020年1月16日周四 下午1:30写道:
>
> > 跟集群无关
> > Best,
> > tison.
> >
> >
> > tison  于2020年1月16日周四 下午1:30写道:
> >
> >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> >>
> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
> >>
> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> >>>
> >>>
> >>> 
> >>> zjfpla...@hotmail.com
> >>>
> >>> 发件人: tison
> >>> 发送时间: 2020-01-16 12:39
> >>> 收件人: user-zh
> >>> 主题: Re: MiniCluster问题
> >>> 你 MiniCluster 要 start 啊(x
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
> >>>
> >>> > MiniCluster代码执行过程中报错:
> >>> >
> >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> >>> further details.
> >>> > Exception in thread "main" java.lang.IllegalStateException:
> >>> MiniCluster is not yet running.
> >>> > at
> >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >>> > at
> >>>
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >>> > at
> >>>
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> >>> > at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >>> > at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >>> > at
> >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >>> >
> >>> > 报错段代码如下:
> >>> >
> >>> > Configuration config =
> >>> GlobalConfiguration.loadConfiguration(flinkConfDir);
> >>> > MiniClusterConfiguration.Builder configBuilder = new
> >>> MiniClusterConfiguration.Builder();
> >>> > configBuilder.setConfiguration(config);
> >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> >>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> >>> miniCluster);
> >>> >
> >>> > 其中flinkConfDir为/opt/flink/conf
> >>> >
> >>> >
> >>> > flink standalone HA集群信息如下:
> >>> > 

Re: Re: MiniCluster问题

2020-01-15 Thread 郑 洁锋
因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试


zjfpla...@hotmail.com

发件人: 郑 洁锋
发送时间: 2020-01-16 14:24
收件人: user-zh
主题: Re: Re: MiniCluster问题
这是完整的到启动的代码

public class ClusterClientFactory {

public static ClusterClient createClusterClient(Options launcherOptions) 
throws Exception {
String mode = launcherOptions.getMode();
if(mode.equals(ClusterMode.standalone.name())) {
return createStandaloneClient(launcherOptions);
} else if(mode.equals(ClusterMode.yarn.name())) {
return createYarnClient(launcherOptions,mode);
}
throw new IllegalArgumentException("Unsupported cluster client type: ");
}

public static ClusterClient createStandaloneClient(Options launcherOptions) 
throws Exception {
String flinkConfDir = launcherOptions.getFlinkconf();
Configuration config = 
GlobalConfiguration.loadConfiguration(flinkConfDir);
MiniClusterConfiguration.Builder configBuilder = new 
MiniClusterConfiguration.Builder();
configBuilder.setConfiguration(config);
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
MiniClusterClient clusterClient = new MiniClusterClient(config, 
miniCluster);
LeaderConnectionInfo connectionInfo = 
clusterClient.getClusterConnectionInfo();
InetSocketAddress address = 
AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
config.setString(JobManagerOptions.ADDRESS, 
address.getAddress().getHostName());
config.setInteger(JobManagerOptions.PORT, address.getPort());
clusterClient.setDetached(true);
return clusterClient;
}


启动类中:

ClusterClient clusterClient = 
ClusterClientFactory.createClusterClient(launcherOptions);
clusterClient.run(program, 1);
clusterClient.shutdown();


zjfpla...@hotmail.com

发件人: tison
发送时间: 2020-01-16 13:31
收件人: user-zh
主题: Re: Re: MiniCluster问题
MiniCluster miniCluster = new MiniCluster(configBuilder.build());

miniCluster.start();


MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster)
;

Best,
tison.


tison  于2020年1月16日周四 下午1:30写道:

> 跟集群无关
> Best,
> tison.
>
>
> tison  于2020年1月16日周四 下午1:30写道:
>
>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>
>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
>>
>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>
>>>
>>> 
>>> zjfpla...@hotmail.com
>>>
>>> 发件人: tison
>>> 发送时间: 2020-01-16 12:39
>>> 收件人: user-zh
>>> 主题: Re: MiniCluster问题
>>> 你 MiniCluster 要 start 啊(x
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>>>
>>> > MiniCluster代码执行过程中报错:
>>> >
>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further details.
>>> > Exception in thread "main" java.lang.IllegalStateException:
>>> MiniCluster is not yet running.
>>> > at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> > at
>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>> > at
>>> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
>>> > at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>> > at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>> > at
>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>> >
>>> > 报错段代码如下:
>>> >
>>> > Configuration config =
>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>> > MiniClusterConfiguration.Builder configBuilder = new
>>> MiniClusterConfiguration.Builder();
>>> > configBuilder.setConfiguration(config);
>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>> miniCluster);
>>> >
>>> > 其中flinkConfDir为/opt/flink/conf
>>> >
>>> >
>>> > flink standalone HA集群信息如下:
>>> > --
>>> > zjfpla...@hotmail.com
>>> >
>>> >
>>> >
>>>
>>


Re: Re: MiniCluster问题

2020-01-15 Thread 郑 洁锋
这是完整的到启动的代码

public class ClusterClientFactory {

public static ClusterClient createClusterClient(Options launcherOptions) 
throws Exception {
String mode = launcherOptions.getMode();
if(mode.equals(ClusterMode.standalone.name())) {
return createStandaloneClient(launcherOptions);
} else if(mode.equals(ClusterMode.yarn.name())) {
return createYarnClient(launcherOptions,mode);
}
throw new IllegalArgumentException("Unsupported cluster client type: ");
}

public static ClusterClient createStandaloneClient(Options launcherOptions) 
throws Exception {
String flinkConfDir = launcherOptions.getFlinkconf();
Configuration config = 
GlobalConfiguration.loadConfiguration(flinkConfDir);
MiniClusterConfiguration.Builder configBuilder = new 
MiniClusterConfiguration.Builder();
configBuilder.setConfiguration(config);
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
MiniClusterClient clusterClient = new MiniClusterClient(config, 
miniCluster);
LeaderConnectionInfo connectionInfo = 
clusterClient.getClusterConnectionInfo();
InetSocketAddress address = 
AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
config.setString(JobManagerOptions.ADDRESS, 
address.getAddress().getHostName());
config.setInteger(JobManagerOptions.PORT, address.getPort());
clusterClient.setDetached(true);
return clusterClient;
}


启动类中:

ClusterClient clusterClient = 
ClusterClientFactory.createClusterClient(launcherOptions);
clusterClient.run(program, 1);
clusterClient.shutdown();


zjfpla...@hotmail.com

发件人: tison
发送时间: 2020-01-16 13:31
收件人: user-zh
主题: Re: Re: MiniCluster问题
MiniCluster miniCluster = new MiniCluster(configBuilder.build());

miniCluster.start();


MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster)
;

Best,
tison.


tison  于2020年1月16日周四 下午1:30写道:

> 跟集群无关
> Best,
> tison.
>
>
> tison  于2020年1月16日周四 下午1:30写道:
>
>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>
>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
>>
>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>
>>>
>>> 
>>> zjfpla...@hotmail.com
>>>
>>> 发件人: tison
>>> 发送时间: 2020-01-16 12:39
>>> 收件人: user-zh
>>> 主题: Re: MiniCluster问题
>>> 你 MiniCluster 要 start 啊(x
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>>>
>>> > MiniCluster代码执行过程中报错:
>>> >
>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further details.
>>> > Exception in thread "main" java.lang.IllegalStateException:
>>> MiniCluster is not yet running.
>>> > at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> > at
>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>> > at
>>> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
>>> > at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>> > at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>> > at
>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>> >
>>> > 报错段代码如下:
>>> >
>>> > Configuration config =
>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>> > MiniClusterConfiguration.Builder configBuilder = new
>>> MiniClusterConfiguration.Builder();
>>> > configBuilder.setConfiguration(config);
>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>> miniCluster);
>>> >
>>> > 其中flinkConfDir为/opt/flink/conf
>>> >
>>> >
>>> > flink standalone HA集群信息如下:
>>> > --
>>> > zjfpla...@hotmail.com
>>> >
>>> >
>>> >
>>>
>>


Re: MiniCluster问题

2020-01-15 Thread Jin Yi
Hi

可以参考org.apache.flink.streaming.api.environment.LocalStreamEnvironment::
execute

public JobExecutionResult execute(String jobName) throws Exception {
  // transform the streaming program into a JobGraph
  StreamGraph streamGraph = getStreamGraph();
  streamGraph.setJobName(jobName);

  JobGraph jobGraph = streamGraph.getJobGraph();
  jobGraph.setAllowQueuedScheduling(true);

  Configuration configuration = new Configuration();
  configuration.addAll(jobGraph.getJobConfiguration());
  configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

  // add (and override) the settings with what the user defined
  configuration.addAll(this.configuration);

  if (!configuration.contains(RestOptions.BIND_PORT)) {
   configuration.setString(RestOptions.BIND_PORT, "0");
  }

  int numSlotsPerTaskManager =
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS,
jobGraph.getMaximumParallelism());

  MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
   .setConfiguration(configuration)
   .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
   .build();

  if (LOG.isInfoEnabled()) {
   LOG.info("Running job on local embedded Flink mini cluster");
  }

  MiniCluster miniCluster = new MiniCluster(cfg);

  try {
   miniCluster.start();
   configuration.setInteger(RestOptions.PORT,
miniCluster.getRestAddress().get().getPort());

   return miniCluster.executeJobBlocking(jobGraph);
  }
  finally {
   transformations.clear();
   miniCluster.close();
  }
 }
}

Best,
Eleanore

On Wed, Jan 15, 2020 at 8:40 PM tison  wrote:

> 你 MiniCluster 要 start 啊(x
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>
> > MiniCluster代码执行过程中报错:
> >
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
> is not yet running.
> > at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> > at
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> > at
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> > at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> > at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> > at
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >
> > 报错段代码如下:
> >
> > Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> >
> > 其中flinkConfDir为/opt/flink/conf
> >
> >
> > flink standalone HA集群信息如下:
> > --
> > zjfpla...@hotmail.com
> >
> >
> >
>


Re: Re: MiniCluster问题

2020-01-15 Thread tison
MiniCluster miniCluster = new MiniCluster(configBuilder.build());

miniCluster.start();


MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster)
;

Best,
tison.


tison  于2020年1月16日周四 下午1:30写道:

> 跟集群无关
> Best,
> tison.
>
>
> tison  于2020年1月16日周四 下午1:30写道:
>
>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>
>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
>>
>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>
>>>
>>> 
>>> zjfpla...@hotmail.com
>>>
>>> 发件人: tison
>>> 发送时间: 2020-01-16 12:39
>>> 收件人: user-zh
>>> 主题: Re: MiniCluster问题
>>> 你 MiniCluster 要 start 啊(x
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>>>
>>> > MiniCluster代码执行过程中报错:
>>> >
>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further details.
>>> > Exception in thread "main" java.lang.IllegalStateException:
>>> MiniCluster is not yet running.
>>> > at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> > at
>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>> > at
>>> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
>>> > at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>> > at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>> > at
>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>> >
>>> > 报错段代码如下:
>>> >
>>> > Configuration config =
>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>> > MiniClusterConfiguration.Builder configBuilder = new
>>> MiniClusterConfiguration.Builder();
>>> > configBuilder.setConfiguration(config);
>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>> miniCluster);
>>> >
>>> > 其中flinkConfDir为/opt/flink/conf
>>> >
>>> >
>>> > flink standalone HA集群信息如下:
>>> > --
>>> > zjfpla...@hotmail.com
>>> >
>>> >
>>> >
>>>
>>


Re: Re: MiniCluster问题

2020-01-15 Thread tison
跟集群无关
Best,
tison.


tison  于2020年1月16日周四 下午1:30写道:

> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>
> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
>
>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>
>>
>> 
>> zjfpla...@hotmail.com
>>
>> 发件人: tison
>> 发送时间: 2020-01-16 12:39
>> 收件人: user-zh
>> 主题: Re: MiniCluster问题
>> 你 MiniCluster 要 start 啊(x
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>>
>> > MiniCluster代码执行过程中报错:
>> >
>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>> further details.
>> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
>> is not yet running.
>> > at
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> > at
>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>> > at
>> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
>> > at
>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>> > at
>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>> > at
>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>> >
>> > 报错段代码如下:
>> >
>> > Configuration config =
>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>> > MiniClusterConfiguration.Builder configBuilder = new
>> MiniClusterConfiguration.Builder();
>> > configBuilder.setConfiguration(config);
>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>> miniCluster);
>> >
>> > 其中flinkConfDir为/opt/flink/conf
>> >
>> >
>> > flink standalone HA集群信息如下:
>> > --
>> > zjfpla...@hotmail.com
>> >
>> >
>> >
>>
>


Re: Re: MiniCluster问题

2020-01-15 Thread tison
1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖

2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊

Best,
tison.


郑 洁锋  于2020年1月16日周四 下午1:18写道:

> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> 我是通过bin/start-cluster.sh启动的flink standalone集群
>
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison
> 发送时间: 2020-01-16 12:39
> 收件人: user-zh
> 主题: Re: MiniCluster问题
> 你 MiniCluster 要 start 啊(x
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>
> > MiniCluster代码执行过程中报错:
> >
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
> is not yet running.
> > at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> > at
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> > at
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> > at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> > at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> > at
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >
> > 报错段代码如下:
> >
> > Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> >
> > 其中flinkConfDir为/opt/flink/conf
> >
> >
> > flink standalone HA集群信息如下:
> > --
> > zjfpla...@hotmail.com
> >
> >
> >
>


Re: Re: MiniCluster问题

2020-01-15 Thread 郑 洁锋
MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
我是通过bin/start-cluster.sh启动的flink standalone集群



zjfpla...@hotmail.com

发件人: tison
发送时间: 2020-01-16 12:39
收件人: user-zh
主题: Re: MiniCluster问题
你 MiniCluster 要 start 啊(x

Best,
tison.


郑 洁锋  于2020年1月16日周四 上午11:38写道:

> MiniCluster代码执行过程中报错:
>
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Exception in thread "main" java.lang.IllegalStateException: MiniCluster is 
> not yet running.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> at 
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> at 
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> at 
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> at 
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>
> 报错段代码如下:
>
> Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
> MiniClusterConfiguration.Builder configBuilder = new 
> MiniClusterConfiguration.Builder();
> configBuilder.setConfiguration(config);
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
>
> 其中flinkConfDir为/opt/flink/conf
>
>
> flink standalone HA集群信息如下:
> --
> zjfpla...@hotmail.com
>
>
>


Re: Flink Sql Join, how to clear the sql join state?

2020-01-15 Thread Benchao Li
Hi LakeShen,

Maybe "Idle State Retention Time"[1] may help in your case.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

LakeShen  于2020年1月16日周四 上午10:15写道:

> Hi community,now I am use flink sql inner join in my code,I saw the flink
> document, the flink sql inner join will keep both sides of the join input
> in Flink’s state forever.
> As result , the hdfs files size are so big , is there any way to clear the
> sql join state?
> Thanks to your reply.
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink Sql Join, how to clear the sql join state?

2020-01-15 Thread Benchao Li
Hi LakeShen,

Maybe "Idle State Retention Time"[1] may help in your case.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

LakeShen  于2020年1月16日周四 上午10:15写道:

> Hi community,now I am use flink sql inner join in my code,I saw the flink
> document, the flink sql inner join will keep both sides of the join input
> in Flink’s state forever.
> As result , the hdfs files size are so big , is there any way to clear the
> sql join state?
> Thanks to your reply.
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: MiniCluster问题

2020-01-15 Thread tison
你 MiniCluster 要 start 啊(x

Best,
tison.


郑 洁锋  于2020年1月16日周四 上午11:38写道:

> MiniCluster代码执行过程中报错:
>
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Exception in thread "main" java.lang.IllegalStateException: MiniCluster is 
> not yet running.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> at 
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> at 
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> at 
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> at 
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>
> 报错段代码如下:
>
> Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
> MiniClusterConfiguration.Builder configBuilder = new 
> MiniClusterConfiguration.Builder();
> configBuilder.setConfiguration(config);
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
>
> 其中flinkConfDir为/opt/flink/conf
>
>
> flink standalone HA集群信息如下:
> --
> zjfpla...@hotmail.com
>
>
>


MiniCluster问题

2020-01-15 Thread 郑 洁锋
MiniCluster代码执行过程中报错:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Exception in thread "main" java.lang.IllegalStateException: MiniCluster is not 
yet running.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at 
org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
at 
org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
at 
com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
at 
com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
at 
com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)

报错段代码如下:

Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
MiniClusterConfiguration.Builder configBuilder = new 
MiniClusterConfiguration.Builder();
configBuilder.setConfiguration(config);
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);

其中flinkConfDir为/opt/flink/conf


flink standalone HA集群信息如下:
[cid:_Foxmail.1@1b958fff-6637-1c63-42a4-5dff5b86583b]

zjfpla...@hotmail.com



Re: How to handle startup for mandatory config parameters?

2020-01-15 Thread Yang Wang
Hi John,

Most of the config options will have default values. However, you still
need to specify some
required fields. For example, the taskmanager resource related options. If
you do not specify
anyone, the exception will be thrown on the client side like following.

Exception in thread "main"
org.apache.flink.configuration.IllegalConfigurationException: Either Task
Heap Memory size (taskmanager.memory.task.heap.size) and Managed Memory
size (taskmanager.memory.managed.size), or Total Flink Memory size
(taskmanager.memory.flink.size), or Total Process Memory size
(taskmanager.memory.process.size) need to be configured explicitly.
at
org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:149)
at
org.apache.flink.runtime.util.BashJavaUtils.getTmResourceJvmParams(BashJavaUtils.java:62)
at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)


Also when you deploy Flink on Yarn cluster, it will check the queue
configuration, resource, etc.
If some config exception throws during startup, the Flink client will fail
and print the exception on
the console and client logs(usually in the {FLINK_HOME}/logs directory).

However, not all the config options could be checked on the client side.
For example, If you set a
wrong checkpoint path, then you need to find the exceptions or errors in
the jobmanager logs.



Best,
Yang

John Smith  于2020年1月16日周四 上午12:38写道:

> Hi, so I have no problem reading config from resources files or anything
> like that...
>
> But my question is around how do we handle mandatory fields?
>
> 1- If a mandatory field is missing during startup... Do we just "log" it
> and do System.exit()?
> 2- If we do log it where does the log end up, the task or the job node?
>


Flink Sql Join, how to clear the sql join state?

2020-01-15 Thread LakeShen
Hi community,now I am use flink sql inner join in my code,I saw the flink
document, the flink sql inner join will keep both sides of the join input
in Flink’s state forever.
As result , the hdfs files size are so big , is there any way to clear the
sql join state?
Thanks to your reply.


Flink Sql Join, how to clear the sql join state?

2020-01-15 Thread LakeShen
Hi community,now I am use flink sql inner join in my code,I saw the flink
document, the flink sql inner join will keep both sides of the join input
in Flink’s state forever.
As result , the hdfs files size are so big , is there any way to clear the
sql join state?
Thanks to your reply.


Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread Yun Tang
Hi

The root cause is checkpoint error due to fail to send data to kafka during 
'preCommit'. The right solution is avoid to send data to kafka unsuccessfully 
which might be scope of Kafka.

If you cannot ensure the status of kafka with its client and no request for 
exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending 
data during 'preCommit' when creating the kafka producer.

If you don't want job failed due to checkpoint error, you can increase the 
tolerableDeclinedCheckpointNumber:

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);

Best
Yun Tang

From: jose farfan 
Sent: Wednesday, January 15, 2020 23:21
To: ouywl 
Cc: user ; user...@flink.apache.org 

Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The 
streamTask checkpoint error .

Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl mailto:ou...@139.com>> wrote:
Hi all:
When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as 
log-1,code is::

input.addSink(
new FlinkKafkaProducer(
parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("output-topic"),
new KafkaEventDeSchema()));

Log-1:
2020-01-09 09:13:44,476 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.
2020-01-09 09:15:33,069 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task f643244ff791dbd3fbfb88bfafdf1872 of job 
d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ 
producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).
2020-01-09 09:15:33,070 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was 
declined.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 
ms has passed since batch creation
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)
at 

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread Yun Tang
Hi

The root cause is checkpoint error due to fail to send data to kafka during 
'preCommit'. The right solution is avoid to send data to kafka unsuccessfully 
which might be scope of Kafka.

If you cannot ensure the status of kafka with its client and no request for 
exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending 
data during 'preCommit' when creating the kafka producer.

If you don't want job failed due to checkpoint error, you can increase the 
tolerableDeclinedCheckpointNumber:

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);

Best
Yun Tang

From: jose farfan 
Sent: Wednesday, January 15, 2020 23:21
To: ouywl 
Cc: user ; user-zh@flink.apache.org 

Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The 
streamTask checkpoint error .

Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl mailto:ou...@139.com>> wrote:
Hi all:
When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as 
log-1,code is::

input.addSink(
new FlinkKafkaProducer(
parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("output-topic"),
new KafkaEventDeSchema()));

Log-1:
2020-01-09 09:13:44,476 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.
2020-01-09 09:15:33,069 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task f643244ff791dbd3fbfb88bfafdf1872 of job 
d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ 
producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).
2020-01-09 09:15:33,070 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was 
declined.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 
ms has passed since batch creation
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)
at 

Re: PubSub source throwing grpc errors

2020-01-15 Thread Richard Deurwaarder
Hi Itamar and Till,

Yes this actually looks a lot worse than it is, fortunately.

>From what I understand this means: something has not released or properly
shutdown an grpc client and the library likes to inform you about this. I
would definartly expect to see this if the job crashes at the 'wrong' point.

As you can see in the issue they did fix or change this at some point. Do
you have something to reproduce this in particular how or when
serialization causes issues? I'll try updating the libraries and see if
that removes the verbose logs.

Regards,

Richard



On Wed, Jan 15, 2020 at 5:37 PM Till Rohrmann  wrote:

> Hi Itamar,
>
> could you share a bit more details about the serialization problem. Which
> class is not serializable and where does it originate from?
>
> Cheers,
> Till
>
> On Tue, Jan 14, 2020 at 9:47 PM Itamar Syn-Hershko <
> ita...@bigdataboutique.com> wrote:
>
>> Thanks!
>>
>> I was able to track this down. Essentially it was a deserialization error
>> which propagated and might have prevented the channel from closing down
>> properly. This could be considered as a problem, but I'm not further down
>> the rabbit hole chasing down a solution for the original deserialization
>> issue.
>>
>> Thanks for the help!
>>
>> On Tue, Jan 14, 2020 at 8:26 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Itamar,
>>>
>>> for further debugging it would be helpful to get the full logs of Flink
>>> and more information about your environment. Since I'm not too
>>> familiar with Flink's PubSub connector, I have pulled in Richard (original
>>> author), Becket and Robert (both helped with reviewing and merging this
>>> connector). They might know what's going on.
>>>
>>> The problem looks a bit similar to [1]. Maybe it would help to upgrade
>>> to a newer google-cloud-pubsub version than 1.62.0. I assume that the
>>> others might know more about it.
>>>
>>> [1] https://github.com/googleapis/google-cloud-java/issues/3648
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jan 13, 2020 at 12:19 PM Itamar Syn-Hershko <
>>> ita...@bigdataboutique.com> wrote:
>>>
 Hi all,

 We are trying to use the PubSub source with a very minimal and basic
 Flink application as a POC, and getting the following error consistently
 every couple of seconds. What am I missing?

 ```
 io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
 cleanQueue
 SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=
 pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
 Make sure to call shutdown()/shutdownNow() and wait until
 awaitTermination() returns true.
 java.lang.RuntimeException: ManagedChannel allocation site
 at
 io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.(ManagedChannelOrphanWrapper.java:103)
 at
 io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:53)
 at
 io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:44)
 at
 io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419)
 at
 org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:55)
 at
 org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:178)
 at
 org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:100)
 at
 org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.base/java.lang.Thread.run(Thread.java:834)
 ```

 Thanks!

 --

 [image: logo] 
 Itamar Syn-Hershko


 ita...@bigdataboutique.com
 https://bigdataboutique.com
 
 
 

>>>
>>
>> --
>>
>> [image: logo] 
>> Itamar Syn-Hershko
>> CTO, Founder
>> +972-54-2467860
>> ita...@bigdataboutique.com
>> https://bigdataboutique.com
>> 
>> 
>> 
>>
>


Re: Fail to deploy flink on k8s in minikube

2020-01-15 Thread Jin Yi
Hi Jary,

>From the Flink Website, it supports Flink Job Cluster deployment strategy
on Kubernetes:
https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster

Best
Eleanore

On Wed, Jan 15, 2020 at 3:18 AM Jary Zhen  wrote:

> Thanks to  YangWang and 刘建刚, This message is good for me too.
> Besides, Which flink version can deploy on k8s?
>
> On Mon, 13 Jan 2020 at 13:51, 刘建刚  wrote:
>
> > Thank you for your help.
> >
> > Yang Wang  于2020年1月13日周一 下午12:53写道:
> >
> >> Hi, Jiangang
> >>
> >> Glad to hear that you are looking to run Flink on Kubernetes.
> >>
> >> It just because you are using the new Kubernetes version.The
> >> extensions/v1beta1
> >> has been removed since v1.16. Please use apps/v1 instead. The apps/v1 is
> >> introduced
> >> from v1.9.0. I will create a ticket fix the documentation.
> >>
> >> Before release-1.10, you could use standalone per-job[1] or standalone
> >> session[2] cluster on
> >> K8s. There are some existing K8s operators to manage the application
> >> lifecycle(e.g. google flink-on-k8s-operator[3],
> >> lyft flink-k8s-operator[4]).
> >>
> >> Running Flink native on K8s is supported from 1.10. You could find it
> >> here [5]. It aims at to make
> >> Flink users more convenient to deploy Flink workloads on K8s cluster.
> >> However, we only support
> >> session cluster now. The per-job mode is in development.
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster
> >> [2]
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes
> >>
> >>
> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
> >> [3] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> >> [4] https://github.com/lyft/flinkk8soperator
> >> [5]
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
> >>
> >> Best,
> >> Yang
> >>
> >> 刘建刚  于2020年1月13日周一 下午12:14写道:
> >>
> >>>   I fail to deploy flink on k8s referring to
> >>>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
> >>>   When I run the command 'kubectl create -f
> >>> jobmanager-deployment.yaml', following error is reported:
> >>> [image: image.png]
> >>>   I am new to k8s. Our team want to deploy flink on k8s. Can anyone
> >>> help me resolve this issue? Can anyone give me some tutorial about k8s
> and
> >>> flink in product? Thank you very much.
> >>>
> >>
>


How to handle startup for mandatory config parameters?

2020-01-15 Thread John Smith
Hi, so I have no problem reading config from resources files or anything
like that...

But my question is around how do we handle mandatory fields?

1- If a mandatory field is missing during startup... Do we just "log" it
and do System.exit()?
2- If we do log it where does the log end up, the task or the job node?


Re: PubSub source throwing grpc errors

2020-01-15 Thread Till Rohrmann
Hi Itamar,

could you share a bit more details about the serialization problem. Which
class is not serializable and where does it originate from?

Cheers,
Till

On Tue, Jan 14, 2020 at 9:47 PM Itamar Syn-Hershko <
ita...@bigdataboutique.com> wrote:

> Thanks!
>
> I was able to track this down. Essentially it was a deserialization error
> which propagated and might have prevented the channel from closing down
> properly. This could be considered as a problem, but I'm not further down
> the rabbit hole chasing down a solution for the original deserialization
> issue.
>
> Thanks for the help!
>
> On Tue, Jan 14, 2020 at 8:26 PM Till Rohrmann 
> wrote:
>
>> Hi Itamar,
>>
>> for further debugging it would be helpful to get the full logs of Flink
>> and more information about your environment. Since I'm not too
>> familiar with Flink's PubSub connector, I have pulled in Richard (original
>> author), Becket and Robert (both helped with reviewing and merging this
>> connector). They might know what's going on.
>>
>> The problem looks a bit similar to [1]. Maybe it would help to upgrade to
>> a newer google-cloud-pubsub version than 1.62.0. I assume that the others
>> might know more about it.
>>
>> [1] https://github.com/googleapis/google-cloud-java/issues/3648
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 13, 2020 at 12:19 PM Itamar Syn-Hershko <
>> ita...@bigdataboutique.com> wrote:
>>
>>> Hi all,
>>>
>>> We are trying to use the PubSub source with a very minimal and basic
>>> Flink application as a POC, and getting the following error consistently
>>> every couple of seconds. What am I missing?
>>>
>>> ```
>>> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
>>> cleanQueue
>>> SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=
>>> pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
>>> Make sure to call shutdown()/shutdownNow() and wait until
>>> awaitTermination() returns true.
>>> java.lang.RuntimeException: ManagedChannel allocation site
>>> at
>>> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.(ManagedChannelOrphanWrapper.java:103)
>>> at
>>> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:53)
>>> at
>>> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:44)
>>> at
>>> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419)
>>> at
>>> org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:55)
>>> at
>>> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:178)
>>> at
>>> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:100)
>>> at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>> ```
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> [image: logo] 
>>> Itamar Syn-Hershko
>>>
>>>
>>> ita...@bigdataboutique.com
>>> https://bigdataboutique.com
>>> 
>>> 
>>> 
>>>
>>
>
> --
>
> [image: logo] 
> Itamar Syn-Hershko
> CTO, Founder
> +972-54-2467860
> ita...@bigdataboutique.com
> https://bigdataboutique.com
> 
> 
> 
>


Re: Flink task node shut it self off.

2020-01-15 Thread John Smith
Hi, so far it seems stable.

On Mon, 6 Jan 2020 at 14:16, John Smith  wrote:

> So I increased all the jobs to 1 minute checkpoint... I let you know how
> it goes... Or of need to rethink gluster lol
>
> On Sat., Jan. 4, 2020, 9:27 p.m. John Smith, 
> wrote:
>
>> It seems to have happened again... Here is a screen shot of the system
>> metrics for that day on that particular node
>>
>> https://www.dropbox.com/s/iudn7z2fvvy7vb8/flink-node.png?dl=0
>>
>>
>> On Fri, 3 Jan 2020 at 12:19, John Smith  wrote:
>>
>>> Well there was this huge IO wait like over 140% spike. IO wait rose
>>> slowly for couple hours then at some time it spiked at 140% and then after
>>> IO wait dropped back to "normal" the CPU 1min 5min 15min spiked to like 3
>>> times the number of cores for a bit.
>>>
>>> We where at "peek" operation. I.e we where running a batch job when this
>>> hapenned. On average operation the "business" requests per second from our
>>> services is about 15 RPS when we do batches we can hit 600 RPS for a few
>>> hours and then back down. Each business request underneath does a few round
>>> trips back and forth between Kafka, cache systems Flink, DBs etc... So
>>> Flink jobs are a subset of some parts of that 600 RPS.
>>>
>>> On Flink side we 3 task managers of 4 cores 8GB which are configured as
>>> 8 slots, 5.4GB JVM, 3.77GB flink managed mem per task manager. We have 8
>>> jobs and 9 slots free. So the cluster isn't full yet. But we do see one
>>> node is full.
>>>
>>> We use disk FS state (backed by GlusterFS) not rocks DB. We had enabled
>>> 5 second checkpointing for 6 of the jobs... So just wondering if that was
>>> possibly the reason for the IO wait... But regardless of the RPS mentioned
>>> above the jobs will always checkpoint every 5 seconds... I had the chance
>>> to increase checkpointing for a few of the jobs before the holidays. I am
>>> back on Monday...
>>>
>>> On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepler, 
>>> wrote:
>>>
 The logs show 2 interesting pieces of information:

 
 ...
 2019-12-19 18:33:23,278 INFO
 org.apache.kafka.clients.FetchSessionHandler  - [Consumer
 clientId=consumer-4, groupId=ccdb-prod-import] Error sending fetch
 request (sessionId=INVALID, epoch=INITIAL) to node 0:
 org.apache.kafka.common.errors.DisconnectException.
 ...
 2019-12-19 19:37:06,732 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
 resolve ResourceManager address 
 akka.tcp://flink@xx-job-0002:36835/user/resourcemanager,
 retrying in 1 ms: Ask timed out on
 [ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/),
 Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
 of type "akka.actor.Identify"..

 This reads like the machine lost network connectivity for some reason.
 The tasks start failing because kafka cannot be reached, and the TM then
 shuts down because it can neither reach the ResourceManager.

 On 25/12/2019 04:34, Zhijiang wrote:

 If you use rocksDB state backend, it might consume extra native memory.
 Some resource framework cluster like yarn would kill the container if
 the memory usage exceeds some threshold. You can also double check whether
 it exists in your case.

 --
 From:John Smith  
 Send Time:2019 Dec. 25 (Wed.) 03:40
 To:Zhijiang  
 Cc:user  
 Subject:Re: Flink task node shut it self off.

 The shutdown happened after the massive IO wait. I don't use any state
 Checkpoints are disk based...

 On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
 wrote:
 Hi John,

 Thanks for the positive comments of Flink usage. No matter
 at least-once or exactly-once you used for checkpoint, it would never lose
 one message during failure recovery.

 Unfortunatelly I can not visit the logs you posted. Generally speaking the
 longer internal checkpoint would mean replaying more source data after
 failure recovery.
 In my experience the 5 seconds interval for checkpoint is too
 frequently in my experience, and you might increase it to 1 minute or so.
 You can also monitor how long will the checkpoint finish in your
 application, then you can adjust the interval accordingly.

 Concerning of the node shutdown you mentioned, I am not quite sure
 whether it is relevant to your short checkpoint interval. Do you config to
 use heap state backend?  The hs_err file really indicated that you job
 had encountered the memory issue, then it is better to somehow increase
 your task manager memory. But if you can analyze the dump hs_err file via
 some profiler tool for checking the memory usage, it might be more helpful
 to find the root cause.

 Best,
 Zhijiang

 

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread jose farfan
Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl  wrote:

> Hi all:
> When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was
> happen as* log-1,code is::*
>
> input.addSink(
> new FlinkKafkaProducer(
> parameterTool.getRequired("bootstrap.servers"),
> parameterTool.getRequired("output-topic"),
> new KafkaEventDeSchema()));
>
>
> *Log-1:*
> 2020-01-09 09:13:44,476 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job
> d8827b3f4165b6ba27c8b59c7aa1a400.
> 2020-01-09 09:15:33,069 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Decline checkpoint 1 by task
> f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400
> at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j
> (dataPort=33361).
> 2020-01-09 09:15:33,070 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Discarding checkpoint 1 of job
> d8827b3f4165b6ba27c8b59c7aa1a400.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason:
> Checkpoint was declined.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:431)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask
> .java:1282)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:
> 1216)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:872)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:777)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:708)
> at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
> .notifyCheckpoint(CheckpointBarrierHandler.java:88)
> at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner
> .processBarrier(CheckpointBarrierAligner.java:113)
> at org.apache.flink.streaming.runtime.io.CheckpointedInputGate
> .pollNext(CheckpointedInputGate.java:155)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .pollNextNullable(StreamTaskNetworkInput.java:102)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .pollNextNullable(StreamTaskNetworkInput.java:47)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:135)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:
> 120018 ms has passed since batch creation
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .checkErroneous(FlinkKafkaProducer.java:1196)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .flush(FlinkKafkaProducer.java:968)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .preCommit(FlinkKafkaProducer.java:892)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .preCommit(FlinkKafkaProducer.java:98)
> at org.apache.flink.streaming.api.functions.sink.
> TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:
> 311)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .snapshotState(FlinkKafkaProducer.java:973)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .snapshotFunctionState(StreamingFunctionUtils.java:99)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .snapshotState(AbstractUdfStreamOperator.java:90)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:399)
> ... 17 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58
> record(s) for k8s-test-data-0:120018 ms has passed since batch creation
> 2020-01-09 09:15:33,074 INFO org.apache.flink.runtime.executiongraph.
> ExecutionGraph - Job producer data frequece
> (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING.
> org.apache.flink.util.FlinkRuntimeException: Exceeded 

Let Flink SQL PlannerExpressionParserImpl#FieldRefrence use Unicode as its default charset

2020-01-15 Thread ??????
Hi all, 
the related issue:https://issues.apache.org/jira/browse/FLINK-15573


 As the title tells, what I do want to do is let the `FieldRefrence` use 
Unicode as its default charset (or maybe as an optional charset which can 
be configured).
According to the `PlannerExpressionParserImpl`, currently FLINK uses 
JavaIdentifier as `FieldRefrence`??s default charset. But, from my 
perspective, it is not enough. Considering that user who uses ElasticSearch as 
sink??we all know that ES has A field called `@timestamp`, which JavaIdentifier 
cannot meet.


 So in my team, we just let `PlannerExpressionParserImpl#FieldRefrence` 
use Unicode as its default charset so that solves this kind of problem. (Plz 
refer to the issue I mentioned above )


In my Opinion, the change shall be for general purpose:
Firstly, Mysql supports unicode as default field charset, see the field 
named `@@`, so shall we support unicode also? 



 What?? s more, my team really get a lot of benefits from 
this change. I also believe that it can give other users more benefits without 
even any harm!
 Fortunately, the change supports fully forwards compatibility.Cuz 
Unicode is the superset of JavaIdentifier. Only a few code change can 
achieve this goal.
 Looking forward for any opinion.
 
btw, thanks to tison~





Best Regards
??Shoi Liu






RE: Table API: Joining on Tables of Complex Types

2020-01-15 Thread Hailu, Andreas
Dawid, this approach looks promising. I'm able to flatten out my Avro records 
into Rows and run simple queries atop of them. I've got a question - when I 
register my Rows as a table, I see the following log providing a warning:

2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class 
org.apache.flink.types.Row does not contain a getter for field fields
2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class 
org.apache.flink.types.Row does not contain a setter for field fields
2020-01-14 17:16:43,084 [main] INFO  TypeExtractor - Class class 
org.apache.flink.types.Row cannot be used as a POJO type because not all fields 
are valid POJO fields, and must be processed as GenericType. Please read the 
Flink documentation on "Data Types & Serialization" for details of the effect 
on performance.

Will this be problematic even now that we've provided TypeInfos for the Rows? 
Performance is something that I'm concerned about as I've already introduced a 
new operation to transform our records to Rows.

// ah

From: Hailu, Andreas [Engineering]
Sent: Wednesday, January 8, 2020 12:08 PM
To: 'Dawid Wysakowicz' ; user@flink.apache.org
Cc: Richards, Adam S [Engineering] 
Subject: RE: Table API: Joining on Tables of Complex Types

Very well - I'll give this a try. Thanks, Dawid.

// ah

From: Dawid Wysakowicz mailto:dwysakow...@apache.org>>
Sent: Wednesday, January 8, 2020 7:21 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Cc: Richards, Adam S [Engineering] 
mailto:adam.richa...@ny.email.gs.com>>
Subject: Re: Table API: Joining on Tables of Complex Types


Hi Andreas,

Converting your GenericRecords to Rows would definitely be the safest option. 
You can check how its done in the 
org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can reuse the 
logic from there to write something like:

DataSet dataset = ...

dataset.map( /* convert GenericRecord to Row 
*/).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));

Another thing you could try is to make sure that GenericRecord is seen as an 
avro type by fink (flink should understand that avro type is a complex type):

dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)

than the TableEnvironment should pick it up as a structured type and flatten it 
automatically when registering the Table. Bear in mind the returns method is 
part of SingleInputUdfOperator so you can apply it right after some 
transformation e.g. map/flatMap etc.

Best,

Dawid


On 06/01/2020 18:03, Hailu, Andreas wrote:
Hi David, thanks for getting back.

>From what you've said, I think we'll need to convert our GenericRecord into 
>structured types - do you have any references or examples I can have a look 
>at? If not, perhaps you could just show me a basic example of flattening a 
>complex object with accessors into a Table of structured types. Or by 
>structured types, did you mean Row?

// ah

From: Dawid Wysakowicz 
Sent: Monday, January 6, 2020 9:32 AM
To: Hailu, Andreas [Engineering] 
; 
user@flink.apache.org
Cc: Richards, Adam S [Engineering] 

Subject: Re: Table API: Joining on Tables of Complex Types


Hi Andreas,

First of all I would highly recommend converting a non-structured types to 
structured types as soon as possible as it opens more possibilities to optimize 
the plan.

Have you tried:

Table users = 
batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0, userName) 
as userName", "f0")
Table other = 
batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0, userName) 
as user", "f1")

Table result = other.join(users, "user = userName")

You could also check how the 
org.apache.flink.formats.avro.AvroRowDeserializationSchema class is implemented 
which internally converts an avro record to a structured Row.

Hope this helps.

Best,

Dawid
On 03/01/2020 23:16, Hailu, Andreas wrote:
Hi folks,

I'm trying to join two Tables which are composed of complex types, Avro's 
GenericRecord to be exact. I have to use a custom UDF to extract fields out of 
the record and I'm having some trouble on how to do joins on them as I need to 
call this UDF to read what I need. Example below:

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // 
GenericRecord field extractor
Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting 
from some pre-existing DataSet
Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);
Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is 
used, as GenericRecord is a complex type requiring you to invoke a get() method 
on the field you're interested in. Here we get a get on field 'userName'

I'd like to do something using the Table API similar to the query "SELECT * 
from 

Re: Custom File Sink using EventTime and defined custom file name for parquet file

2020-01-15 Thread Kostas Kloudas
Oops, sorry for not sending the reply to everyone
and thanks David for reposting it here.
Great to hear that you solved your issue!

Kostas



On Wed, Jan 15, 2020 at 1:57 PM David Magalhães  wrote:
>
> Sorry, I've only saw the replies today.
>
> Regarding my previous email,
>
>> Still, there is something missing in this solution to close a window for 
>> with a giving timeout, so it can write into the sink the last events if no 
>> more events are sent.
>
>
> I've fixed this using a custom trigger,
>
> val flag = ctx.getPartitionedState(valueStateDescriptor).value()
>
> // Flag only used to register one trigger per window. Flag is cleaned when 
> FIRE action is executed.
> if (!flag) {
>   val delay = window.getEnd - window.getStart
>   ctx.getPartitionedState(valueStateDescriptor).update(true)
>   ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
>   ctx.registerEventTimeTimer(window.maxTimestamp())
> }
>
> TriggerResult.CONTINUE
>
> Leonard, by "duplicated events" I mean store the same event on different 
> parquet files, since the file format was "part-X-Y". So, if I start to 
> process the same stream again (from a point in the past) I couldn't overwrite 
> the files with exactly the same name.
>
> I think I've read a blogpost about them (Pinterest), I will check the video.
>
> Kostas, replied to only me, I'm adding his response here.
>
>> Hi David,
>> I skimmed through the solution with the window before the sink.
>> If this solution fits your needs, I think you could:
>> 1)  just specify a BucketAssigner instead of writing a custom sink,
>> this will allow you to not lose any functionality from the
>> StreamingFileSink
>> 2)  for the timeout requirement, you could use a (keyed) process
>> function with map state to hold your event-time windows. The key will
>> be the window start (or interval) and you can register timers to fire
>> at the end of the window or after a certain period of inactivity. I
>> think that [1] can be a good starting point.
>> I hope this helps,
>> Kostas
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
>
> I think I can only define partition name on BucketAssigner, because I don't 
> want to have many partition (currently I've accountId and MM (year and 
> month)). I've checked that on Flink 1.10 [1] we will have access to configure 
> a prefix and suffix for the filename, where I could add the day and hour to 
> the prefix, and when I needed to store again the same events I could start 
> from specific time (probably match with a Kafka offset) and remove the files 
> with prefix date newer than this time.
>
> The only scenario for this case is when for some reason Flink is writing bad 
> files (events with wrong information for some reason), that need to be stored 
> (processed) again.
>
> For 2), my implementation with the trigger solved this.
>
> [1] 
> https://github.com/apache/flink/blob/master/docs/dev/connectors/streamfile_sink.md
>
> On Tue, Jan 14, 2020 at 6:28 PM Till Rohrmann  wrote:
>>
>> Hi David,
>>
>> I'm pulling in Kostas who worked on the StreamingFileSink and might be able 
>> to answer some of your questions.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu  wrote:
>>>
>>> Hi, David
>>>
>>> For you first description, I’m a little confused about duplicated records 
>>> when backfilling, could you describe your usage scenario/code more?
>>>
>>> I remembered a backfill user solution from Pinterest which is very similar 
>>> to yours and using Flink too[1], hope that can help you.
>>>
>>> Best,
>>> Leonard
>>>
>>> [1] 
>>> https://www.youtube.com/watch?v=3-X6FJ5JS4E=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz=64
>>>
>>> 在 2020年1月10日,12:14,David Magalhães  写道:
>>>
>>> Hi, I'm working for the first time with Flink and I'm trying to create 
>>> solution that will store events from Kafka into Parquet files in S3. This 
>>> also should support re-injection of events from Parquet files into a Kafka 
>>> topic.
>>>
>>> Here is the code with a simple usage of StreamingFileSink with BulkEncode 
>>> that will get the events and store in parquet files. The files will be 
>>> partition by account_id and year and month (MM). The issue with this 
>>> approach is when running the backfill from a certain point in time, it will 
>>> be hard to not generate duplicated events, since we will not override the 
>>> same files, as the filename is generate by 
>>> "part--".
>>>
>>> To add predictability, I've used a tumbling window to aggregate multiple 
>>> GenericRecord, in order to write the parquet file with a list of them. For 
>>> that I've created a custom file sink, but I'm not sure of the properties I 
>>> am going to lose compared to the Streaming File Sink. Here is the code. 
>>> Still, there is something missing in this solution to close a window for 
>>> with a giving timeout, so it can write into the sink the last events if no 
>>> more events are sent.
>>>
>>> Another 

Let Flink SQL PlannerExpressionParserImpl#FieldRefrence use Unicode as its default charset

2020-01-15 Thread ??????
Hi all, 
the related issue:https://issues.apache.org/jira/browse/FLINK-15573


 As the title tells, what I do want to do is let the `FieldRefrence` use 
Unicode as its default charset (or maybe as an optional charset which can 
be configured).
According to the `PlannerExpressionParserImpl`, currently FLINK uses 
JavaIdentifier as `FieldRefrence`??s default charset. But, from my 
perspective, it is not enough. Considering that user who uses ElasticSearch as 
sink??we all know that ES has A field called `@timestamp`, which JavaIdentifier 
cannot meet.


 So in my team, we just let `PlannerExpressionParserImpl#FieldRefrence` 
use Unicode as its default charset so that solves this kind of problem. (Plz 
refer to the issue I mentioned above )


In my Opinion, the change shall be for general purpose:
Firstly, Mysql supports unicode as default field charset, see the field 
named `@@`, so shall we support unicode also? 



 What?? s more, my team really get a lot of benefits from 
this change. I also believe that it can give other users more benefits without 
even any harm!
 Fortunately, the change supports fully forwards compatibility.Cuz 
Unicode is the superset of JavaIdentifier. Only a few code change can 
achieve this goal.
 Looking forward for any opinion.
 
btw, thanks to tison~





Best Regards
??Shoi Liu






Re: Custom File Sink using EventTime and defined custom file name for parquet file

2020-01-15 Thread David Magalhães
Sorry, I've only saw the replies today.

Regarding my previous email,

Still, there is something missing in this solution to close a window for
> with a giving timeout, so it can write into the sink the last events if no
> more events are sent.


I've fixed this using a custom trigger,

val flag = ctx.getPartitionedState(valueStateDescriptor).value()

// Flag only used to register one trigger per window. Flag is cleaned when
FIRE action is executed.
if (!flag) {
  val delay = window.getEnd - window.getStart
  ctx.getPartitionedState(valueStateDescriptor).update(true)
  ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
  ctx.registerEventTimeTimer(window.maxTimestamp())
}

TriggerResult.CONTINUE

Leonard, by "duplicated events" I mean store the same event on different
parquet files, since the file format was "part-X-Y". So, if I start to
process the same stream again (from a point in the past) I couldn't
overwrite the files with exactly the same name.

I think I've read a blogpost about them (Pinterest), I will check the video.

Kostas, replied to only me, I'm adding his response here.

Hi David,
> I skimmed through the solution with the window before the sink.
> If this solution fits your needs, I think you could:
> 1)  just specify a BucketAssigner instead of writing a custom sink,
> this will allow you to not lose any functionality from the
> StreamingFileSink
> 2)  for the timeout requirement, you could use a (keyed) process
> function with map state to hold your event-time windows. The key will
> be the window start (or interval) and you can register timers to fire
> at the end of the window or after a certain period of inactivity. I
> think that [1] can be a good starting point.
> I hope this helps,
> Kostas
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html


I think I can only define partition name on *BucketAssigner*, because I
don't want to have many partition (currently I've *accountId* and *MM*
(year and month)). I've checked that on Flink 1.10 [1] we will have access
to configure a prefix and suffix for the filename, where I could add the
day and hour to the prefix, and when I needed to store again the same
events I could start from specific time (probably match with a Kafka
offset) and remove the files with prefix date newer than this time.

The only scenario for this case is when for some reason Flink is writing
bad files (events with wrong information for some reason), that need to be
stored (processed) again.

For 2), my implementation with the trigger solved this.

[1]
https://github.com/apache/flink/blob/master/docs/dev/connectors/streamfile_sink.md

On Tue, Jan 14, 2020 at 6:28 PM Till Rohrmann  wrote:

> Hi David,
>
> I'm pulling in Kostas who worked on the StreamingFileSink and might be
> able to answer some of your questions.
>
> Cheers,
> Till
>
> On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu  wrote:
>
>> Hi, David
>>
>> For you first description, I’m a little confused about duplicated records
>> when backfilling, could you describe your usage scenario/code more?
>>
>> I remembered a backfill user solution from Pinterest which is very
>> similar to yours and using Flink too[1], hope that can help you.
>>
>> Best,
>> Leonard
>>
>> [1]
>> https://www.youtube.com/watch?v=3-X6FJ5JS4E=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz=64
>>
>>
>> 在 2020年1月10日,12:14,David Magalhães  写道:
>>
>> Hi, I'm working for the first time with Flink and I'm trying to create
>> solution that will store events from Kafka into Parquet files in S3. This
>> also should support re-injection of events from Parquet files into a Kafka
>> topic.
>>
>> Here
>> 
>> is the code with a simple usage of StreamingFileSink with BulkEncode that
>> will get the events and store in parquet files. The files will be partition
>> by account_id and year and month (MM). The issue with this approach is
>> when running the backfill from a certain point in time, it will be hard to
>> not generate duplicated events, since we will not override the same files,
>> as the filename is generate by "*part--*
>> ".
>>
>> To add predictability, I've used a tumbling window to aggregate multiple
>> GenericRecord, in order to write the parquet file with a list of them. For
>> that I've created a custom file sink, but I'm not sure of the properties I
>> am going to lose compared to the Streaming File Sink. Here
>> 
>> is the code. Still, there is something missing in this solution to close a
>> window for with a giving timeout, so it can write into the sink the last
>> events if no more events are sent.
>>
>> Another work around, would be create a StreamingFileSink with a
>> RowEncoder, and receive a List of GenericRecord, and create a custom
>> Encoder with *AvroParquetWritter* to write to a File. This way I have
>> access to a custom 

Re: Slots Leak Observed when

2020-01-15 Thread Till Rohrmann
Hi,

have you tried one of the latest Flink versions to see whether the problem
still exists? I'm asking because there are some improvements which allow
for slot reconciliation between the TaskManager and the JobMaster [1]. As a
side note, the community is no longer supporting Flink 1.6.x.

For further debugging the DEBUG logs would be necessary.

[1] https://issues.apache.org/jira/browse/FLINK-11059

Cheers,
Till

On Wed, Jan 15, 2020 at 7:25 AM Xintong Song  wrote:

> Hi,
> It would be helpful for understanding the problem if you could share the
> logs.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jan 15, 2020 at 12:23 AM burgesschen 
> wrote:
>
>> Hi guys,
>>
>> Out team is observing a stability issue on our Standalone Flink clusters.
>>
>> Background: The kafka cluster our flink jobs read from/ write to have some
>> issues and every 10 to15 mins one of the partition leaders switch. This
>> causes jobs that write to/ read from that topic fail and restart. Usually
>> this is not a problem since the jobs can restart and work with the new
>> partition leader. However, one of those restarts can make the jobs enter a
>> failing state and never be able to recover.
>>
>> In the failing state. The jobmanager has exception:
>>
>>
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate all requires slots within timeout of 30 ms. Slots
>> required: 24, slots allocated: 12
>>
>> During that time, 2 of the taskmanager are reporting that all the slots on
>> them are occupied, however, from the dashboard of the jobmanager, no job
>> is
>> deployed to those 2 taskmanagers.
>>
>> My guesstimation is that since the jobs restart fairly frequently, one of
>> the times the slots are not released properly when jobs failed, resulting
>> in
>> the jobmanager falsely believing that those 2 taskmanagers' slots are
>> still
>> occupied.
>>
>> It does sound like an issue mentioned in
>> https://issues.apache.org/jira/browse/FLINK-9932
>> but we are using 1.6.2 and according to the jira ticket, this bug is fixed
>> in 1.6.2
>>
>> Please let me know if you have any ideas or how we can prevent it. Thank
>> you
>> so much!
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Fail to deploy flink on k8s in minikube

2020-01-15 Thread Jary Zhen
Thanks to  YangWang and 刘建刚, This message is good for me too.
Besides, Which flink version can deploy on k8s?

On Mon, 13 Jan 2020 at 13:51, 刘建刚  wrote:

> Thank you for your help.
>
> Yang Wang  于2020年1月13日周一 下午12:53写道:
>
>> Hi, Jiangang
>>
>> Glad to hear that you are looking to run Flink on Kubernetes.
>>
>> It just because you are using the new Kubernetes version.The
>> extensions/v1beta1
>> has been removed since v1.16. Please use apps/v1 instead. The apps/v1 is
>> introduced
>> from v1.9.0. I will create a ticket fix the documentation.
>>
>> Before release-1.10, you could use standalone per-job[1] or standalone
>> session[2] cluster on
>> K8s. There are some existing K8s operators to manage the application
>> lifecycle(e.g. google flink-on-k8s-operator[3],
>> lyft flink-k8s-operator[4]).
>>
>> Running Flink native on K8s is supported from 1.10. You could find it
>> here [5]. It aims at to make
>> Flink users more convenient to deploy Flink workloads on K8s cluster.
>> However, we only support
>> session cluster now. The per-job mode is in development.
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes
>>
>> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
>> [3] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
>> [4] https://github.com/lyft/flinkk8soperator
>> [5]
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>>
>> Best,
>> Yang
>>
>> 刘建刚  于2020年1月13日周一 下午12:14写道:
>>
>>>   I fail to deploy flink on k8s referring to
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
>>>   When I run the command 'kubectl create -f
>>> jobmanager-deployment.yaml', following error is reported:
>>> [image: image.png]
>>>   I am new to k8s. Our team want to deploy flink on k8s. Can anyone
>>> help me resolve this issue? Can anyone give me some tutorial about k8s and
>>> flink in product? Thank you very much.
>>>
>>


Re: How Flink read files from local filesystem

2020-01-15 Thread Tillman Peng
You can use env.readTextFile(path) which accepts path to a directory and 
reads all files in that directory producing record for each line.


on 2020/1/15 17:58, Soheil Pourbafrani wrote:
Suppose we have a Flink single node cluster with multiple slots and some 
input files exist in local file system. In this case where we have no 
distributed file system to dedicate each file's block to taskmanagers, 
how Flink will read the file? Do all the task managers will open the 
file separately and read some dedicated portion of the file in parallel?


Re: [DISCUSS] decentralized scheduling strategy is needed

2020-01-15 Thread HuWeihua
Hi, Andrey

Thanks for your response.

I have checked this Jira ticket and I think it can work in standalone mode 
which TaskManager has been started before scheduling tasks.
But we are currently running flink on yarn in per-job cluster mode.

I noticed that this issue has already been raised. I will keep watching this 
ticket. 

Thanks again.

Best
Weihua Hu

> 2020年1月15日 17:53,Andrey Zagrebin  写道:
> 
> HI HuWeihua,
> 
> I think your issue should resolve with 1.9.2 and 1.10 (not released but in 
> progress).
> You can check the related Jira ticket [1].
> 
> Best,
> Andrey
> 
> [1] https://jira.apache.org/jira/browse/FLINK-12122 
> 
> On Wed, Jan 15, 2020 at 10:08 AM HuWeihua  > wrote:
> Hi, All
> We encountered some problems during the upgrade from Flink 1.5 to Flink 1.9. 
> Flink's scheduling strategy has changed. Flink 1.9 prefers centralized 
> scheduling, while Flink 1.5 prefers decentralized scheduling. This change has 
> caused resources imbalance and blocked our upgrade plan. We have thousands of 
> jobs that need to be upgraded.
> 
> For example,
> There is a job with 10 sources and 100 sinks. Each source need 1 core and 
> each sink need 0.1 core.
> Try to run this job on Yarn, configure the numberOfTaskSlots is 10, 
> yarn.containers.vcores is 2.
> 
> When using Flink-1.5:
> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores totally. 
> So the job with this configuration works very well. The schedule results is 
> shown in Figure 1.
> When using Flink-1.9:
> The 10 sources will be scheduled to one TaskManager  and the 100 sinks will 
> scheduled to other 10 TaskManagers.  The schedule results is shown in Figure 
> 2.
> In this scenario, the TaskManager which run sources need 10 cores, other 
> TaskManagers need 1 cores. But TaskManager must be configured the same, So we 
> need 11 TaskManager with 10 cores. 
> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
> 
> In addition to the waste of resources, we also encountered other problems 
> caused by centralized scheduling strategy.
> Network bandwidth. Tasks of the same type are scheduled to the one 
> TaskManager, causing too much network traffic on the machine.
> Some jobs need to sink to the local agent. After centralized scheduling, the 
> insufficient processing capacity of the single machine causes a backlog of 
> consumption.
> 
> In summary, we think a decentralized scheduling strategy is necessary.
> 
> 
> Figure 1. Flink 1.5 schedule results
> <粘贴的图形-3.tiff>
> 
> Figure 2. Flink 1.9 schedule results
> <粘贴的图形-4.tiff>
> 
> 
> 
> Best
> Weihua Hu
> 



Re: [DISCUSS] decentralized scheduling strategy is needed

2020-01-15 Thread Chesnay Schepler
This is a known issue that's will be fixed in 1.9.2/1.10.0; see 
https://issues.apache.org/jira/browse/FLINK-12122 .


On 15/01/2020 10:07, HuWeihua wrote:

Hi, All
We encountered some problems during the upgrade from Flink 1.5 to 
Flink 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers 
centralized scheduling, while Flink 1.5 prefers decentralized 
scheduling. This change has caused resources imbalance and blocked our 
upgrade plan. We have thousands of jobs that need to be upgraded.


For example,
There is a job with 10 sources and 100 sinks. Each source need 1 core 
and each sink need 0.1 core.
Try to run this job on Yarn, configure the numberOfTaskSlots is 10, 
yarn.containers.vcores is 2.


When using Flink-1.5:
Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores 
totally. So the job with this configuration works very well. 
The schedule results is shown in Figure 1.

When using Flink-1.9:
The 10 sources will be scheduled to one TaskManager  and the 100 sinks 
will scheduled to other 10 TaskManagers. The schedule results is shown 
in Figure 2.
In this scenario, the TaskManager which run sources need 10 cores, 
other TaskManagers need 1 cores. But TaskManager must be configured 
the same, So we need 11 TaskManager with 10 cores.

This situation waste (10-2)*11 = 88cores more than Flink 1.5.

In addition to the waste of resources, we also encountered other 
problems caused by centralized scheduling strategy.


 1. Network bandwidth. Tasks of the same type are scheduled to the one
TaskManager, causing too much network traffic on the machine.

 2. Some jobs need to sink to the local agent. After centralized
scheduling, the insufficient processing capacity of the single
machine causes a backlog of consumption.


In summary, we think a decentralized scheduling strategy is necessary.


Figure 1. Flink 1.5 schedule results

Figure 2. Flink 1.9 schedule results



Best
Weihua Hu





How Flink read files from local filesystem

2020-01-15 Thread Soheil Pourbafrani
Hi,

Suppose we have a Flink single node cluster with multiple slots and some
input files exist in local file system. In this case where we have no
distributed file system to dedicate each file's block to taskmanagers, how
Flink will read the file? Do all the task managers will open the file
separately and read some dedicated portion of the file in parallel?


Re: 求助帖: 流join场景可能出现的重复计算

2020-01-15 Thread Ren Xie
谢谢!

我研究一下

JingsongLee  于2020年1月15日周三 上午11:57写道:

> Hi ren,
>
> Blink的deduplication功能应该是能match你的需求。[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>
> Best,
> Jingsong Lee
>
>
> --
> From:Caizhi Weng 
> Send Time:2020年1月15日(星期三) 11:53
> To:user-zh 
> Subject:Re: 求助帖: 流join场景可能出现的重复计算
>
> Hi,
>
> Flink 目前认为所有的 source 都是 append only 的,retract、upsert 等都是内部处理时的概念,对用户是不可见的。
>
> 所以目前你只能先通过 group by 和 last_value 等方式实现功能。不过 1.11 有计划支持这样的需求。
>
> Ren Xie  于2020年1月14日周二 下午9:30写道:
>
> > 谢谢
> >
> > 考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化.
> >
> > 请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧
> >
> > 还是说我这样的需求呀 实现呀 是野路子?
> >
> > Yuan,Youjun  于2020年1月14日周二 下午8:22写道:
> >
> > > 取决于具体的场景。想到的有如下几种方案:
> > > 1,group by student_id和student_name,而不是只group by
> > > student_id。当然前提是修改同名名字不会推送一条消息到流1.
> > > 2,过滤掉update的消息
> > > 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。
> > >
> > > -邮件原件-
> > > 发件人: xin Destiny 
> > > 发送时间: Tuesday, January 14, 2020 6:39 PM
> > > 收件人: user-zh@flink.apache.org
> > > 主题: Re: 求助帖: 流join场景可能出现的重复计算
> > >
> > > Hi,
> > > 如果说插入两条update操作呢,一次分数是-97,一次是97
> > >
> > >
> > >
> > >
> > > Ren Xie  于2020年1月14日周二 下午6:20写道:
> > >
> > > > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> > > >
> > > > 大致 写一下 也就是这样了
> > > > ```sql
> > > > select sum(score)
> > > > from
> > > > student t1 inner join score t2 on t1.student_id = t2.std_id where
> > > > t1.student_id = 11
> > > > ```
> > > > 然后
> > > >
> > > > ```Java
> > > > String sql = ↑;
> > > > Table t = tEnv.sqlQuery(sql);
> > > > DataStream stream1 = tEnv.toAppendStream(t, Integer.class);
> > > > stream1.keyBy("").sum("");
> > > > ```
> > > >
> > > > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> > > >
> > > > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> > > >
> > > > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> > > >
> > > >
> > > > Caizhi Weng  于2020年1月14日周二 下午5:49写道:
> > > >
> > > > > Hi,
> > > > >
> > > > > 有可能的话,是否方便提供一下代码呢?
> > > > >
> > > > > Ren Xie  于2020年1月14日周二 下午5:38写道:
> > > > >
> > > > > > 学生
> > > > > > student_id name
> > > > > > 11 foo
> > > > > >
> > > > > > 学科分数
> > > > > > id name score std_id
> > > > > > 100 math 97 11
> > > > > > 101 english 98 11
> > > > > >
> > > > > > 有如下一个场景(假设只有一个学生)
> > > > > >
> > > > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > > > >
> > > > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > > > >
> > > > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2
> *
> > > > > > (97
> > > > +
> > > > > > 98) = 390
> > > > > >
> > > > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > > > >
> > > > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > > > >
> > > > >
> > > >
> > >
> >
>


?????? Re: Re: Re: ??????:flink ????kafka source ????????????

2020-01-15 Thread Others
 lib  







----
??:"JingsongLee"https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage


--
From:Others <41486...@qq.com
Send Time:2020??1??15??(??) 15:54
To:user-zh@flink.apache.org JingsongLee http://maven.apache.org/plugins/maven-shade-plugin/
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing 
/Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT.jar 
with 
/Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT-shaded.jar
[INFO] Dependency-reduced POM written at: 
/Users/lidonghao/Documents/doumob/flinkjob/dependency-reduced-pom.xml
[INFO] 
[INFO] BUILD SUCCESS
[INFO] 
[INFO] Total time: 9.447 s
[INFO] Finished at: 2020-01-15T15:24:56+08:00
[INFO] Final Memory: 69M/781M
[INFO] 

Process finished with exit code 0

[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the 
shaded jar.

--  --
??: "JingsongLee"http://www.myorganization.orghttps://repository.apache.org/content/repositories/snapshots/

????????????:flink ????kafka source ????????????

2020-01-15 Thread Others
 lib  




----
??:"AS"https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies
 jarflink??lib??() ??. 
 ??.






??2020??01??15?? 14:59??Others<41486...@qq.com ??
flink  ??1.9.1
??
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - 
Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
nbsp;nbsp;nbsp;nbsp;at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
nbsp;nbsp;nbsp;nbsp;at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
nbsp;nbsp;nbsp;nbsp;at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
nbsp;nbsp;nbsp;nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
nbsp;nbsp;nbsp;nbsp;at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
nbsp;nbsp;nbsp;nbsp;at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
nbsp;nbsp;nbsp;nbsp;at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
nbsp;nbsp;nbsp;nbsp;at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
nbsp;nbsp;nbsp;nbsp;at 
java.lang.reflect.Method.invoke(Method.java:498)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
nbsp;nbsp;nbsp;nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
nbsp;nbsp;nbsp;nbsp;at 

两个问题:sideoutput 及 sql里的state

2020-01-15 Thread izual
1. SideOutput
按照文档:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/side_output.html
 以及 SideOutputITCase.scala 的单测代码,实现了一个一样的例子。不过执行时会报错:
Caused by: java.lang.IllegalArgumentException: OutputTag must not be null.
我理解报错是正常的,因为
val outputTag = OutputTag[String]("side-output")
是在 jobManager 构造,而 taskManager 执行时该变量为 null.
但是文档和单测都是这么写,是我理解错了么?


2. Sql with State
按照 Streaming API 
的文档:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html,我理解
 State 可以用于一些需要持久化统计数据的场景,使得代码升级重启job时,可以基于原来的数据继续计算。目前测试了下,从 savepoint run job 
时,结果符合预期。
不过想了解下,在 SQL 
下,如何应用这个特性呢?https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/sql.html



谢谢 

Re: StreamingFileSink doesn't close multipart uploads to s3?

2020-01-15 Thread Kostas Kloudas
Hi Ken, Jingsong and Li,

Sorry for the late reply.

As Jingsong pointed out, upon calling close() the StreamingFileSink
does not commit the in-progress/pending files.
The reason for this is that the close() method of any UDF including
sink functions is called on both normal termination and termination
due to failure.
Given this, we cannot commit the files, because in case of failure
they should be reverted.

Actually we are currently updating the StreamingFileSink docs to
includes this among other things.
Also the differentiation between normal termination and termination
due to failure will hopefully be part of Flink 1.11 and
this is the FLIP to check
https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs.

Cheers,
Kostas

On Fri, Jan 10, 2020 at 4:45 PM Ken Krugler  wrote:
>
> Hi Kostas,
>
> I didn’t see a follow-up to this, and have also run into this same issue of 
> winding up with a bunch of .inprogress files when a bounded input stream ends 
> and the job terminates.
>
> When StreamingFileSystem.close() is called, shouldn’t all buckets get 
> auto-rolled, so that the .inprogress files become part-xxx files?
>
> Thanks,
>
> — Ken
>
>
> On Dec 9, 2019, at 6:56 PM, Jingsong Li  wrote:
>
> Hi Kostas,
>
> I  took a look to StreamingFileSink.close, it just delete all temporary 
> files. I know it is for failover. When Job fail, it should just delete temp 
> files for next restart.
> But for testing purposes, we just want to run a bounded streaming job. If 
> there is no checkpoint trigger, no one will move the final temp files to 
> output path, so the result of this job is wrong.
> Do you have any idea about this? Can we distinguish "fail close" from 
> "success finish close" in StreamingFileSink?
>
> Best,
> Jingsong Lee
>
> On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas  wrote:
>>
>> Hi Li,
>>
>> This is the expected behavior. All the "exactly-once" sinks in Flink
>> require checkpointing to be enabled.
>> We will update the documentation to be clearer in the upcoming release.
>>
>> Thanks a lot,
>> Kostas
>>
>> On Sat, Dec 7, 2019 at 3:47 AM Li Peng  wrote:
>> >
>> > Ok I seem to have solved the issue by enabling checkpointing. Based on the 
>> > docs (I'm using 1.9.0), it seemed like only 
>> > StreamingFileSink.forBulkFormat() should've required checkpointing, but 
>> > based on this experience, StreamingFileSink.forRowFormat() requires it 
>> > too! Is this the intended behavior? If so, the docs should probably be 
>> > updated.
>> >
>> > Thanks,
>> > Li
>> >
>> > On Fri, Dec 6, 2019 at 2:01 PM Li Peng  wrote:
>> >>
>> >> Hey folks, I'm trying to get StreamingFileSink to write to s3 every 
>> >> minute, with flink-s3-fs-hadoop, and based on the default rolling policy, 
>> >> which is configured to "roll" every 60 seconds, I thought that would be 
>> >> automatic (I interpreted rolling to mean actually close a multipart 
>> >> upload to s3).
>> >>
>> >> But I'm not actually seeing files written to s3 at all, instead I see a 
>> >> bunch of open multipart uploads when I check the AWS s3 console, for 
>> >> example:
>> >>
>> >>  "Uploads": [
>> >> {
>> >> "Initiated": "2019-12-06T20:57:47.000Z",
>> >> "Key": "2019-12-06--20/part-0-0"
>> >> },
>> >> {
>> >> "Initiated": "2019-12-06T20:57:47.000Z",
>> >> "Key": "2019-12-06--20/part-1-0"
>> >> },
>> >> {
>> >> "Initiated": "2019-12-06T21:03:12.000Z",
>> >> "Key": "2019-12-06--21/part-0-1"
>> >> },
>> >> {
>> >> "Initiated": "2019-12-06T21:04:15.000Z",
>> >> "Key": "2019-12-06--21/part-0-2"
>> >> },
>> >> {
>> >> "Initiated": "2019-12-06T21:22:23.000Z"
>> >> "Key": "2019-12-06--21/part-0-3"
>> >> }
>> >> ]
>> >>
>> >> And these uploads are being open for a long time. So far after an hour, 
>> >> none of the uploads have been closed. Is this the expected behavior? If I 
>> >> wanted to get these uploads to actually write to s3 quickly, do I need to 
>> >> configure the hadoop stuff to get that done, like setting a smaller 
>> >> buffer/partition size to force it to upload?
>> >>
>> >> Thanks,
>> >> Li
>
>
>
> --
> Best, Jingsong Lee
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>


????????????:flink ????kafka source ????????????

2020-01-15 Thread AS
Hi:
??, kafka??factory.
?? 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies
 jarflink??lib??() ??. 
??.






??2020??01??15?? 14:59??Others<41486...@qq.com> ??
flink  ??1.9.1
??
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
at 

Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

2020-01-15 Thread JingsongLee
+user-zh


--
From:JingsongLee 
Send Time:2020年1月15日(星期三) 16:05
To:Others <41486...@qq.com>
Subject:Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

是的。
另一个方法是使用[1]的classpath,添加多个jars。

BTW, 回复邮件时请带上user-zh。

Best,
Jingsong Lee

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:54
To:user-zh@flink.apache.org JingsongLee 
Subject:回复: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,
我的集群 是Standalone 方式部署的 是加在 Flink Master机器下么 还是每一台都要加? 加完之后是否需要重启集群?


-- 原始邮件 --
发件人: "JingsongLee";
发送时间: 2020年1月15日(星期三) 下午3:46
收件人: "Others"<41486...@qq.com>;"user-zh";
主题:  Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,

我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。
你试试把flink-json和flink-kafka的jar直接放入flink/lib下

Best,
Jingsong Lee


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:27
To:user-zh@flink.apache.org JingsongLee 
Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错


集群环境下应该放在哪个lib下?

一下是打包过程的log
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob ---
[INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded 
jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded 
jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 
in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 
in the shaded jar.
[INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar.
[INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded 
jar.
[INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the 
shaded jar.
[INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440