维表如何实现动态查询

2019-07-02 Thread 雒正林
维表(mysql) 是动态变化的,与流表join 时,维表一直是第一次查询到的数据,后面维表变化的数据,在join时,查询不到。


Re: Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb?

2019-07-02 Thread Yun Tang
hi

首先,就算选择rocksDB statebackend,也是需要写HDFS的,只是在开启了incremental 
checkpoint方式情况下可以减少每次hdfs数据写入。

我觉得这个问题核心是一个trade 
off。不做checkpoint的时候,RocksDBStateBackend的读写性能不如纯内存的FsStateBackend。而在checkpoint的同步阶段,RocksDB
 
stateBackend需要全量写本地磁盘,比FsStateBackend的内存操作可能要慢一些,也会影响吞吐。在checkpoint的异步阶段,由于RocksDB
 stateBackend支持增量上传,所以对HDFS的压力可能要更小一些;但同时,也可以通过打开对FsStateBackend的压缩[1] 
来降低FsStateBackend对HDFS的压力。

如果你对吞吐很敏感的话,在state很小的时候,可以选择FsStateBackend,否则应该选择RocksDBStateBackend,可以避免OOM的风险。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression

祝好
唐云


From: yeyi9...@sina.com 
Sent: Wednesday, July 3, 2019 11:34
To: user-zh
Subject: Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb?

Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb呀,看官档说的是rockdb适合state很大的任务,可能吞吐会降低。但是如果选用file的话对hdfs的压力又很大


Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb?

2019-07-02 Thread yeyi9999
 
Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb呀,看官档说的是rockdb适合state很大的任务,可能吞吐会降低。但是如果选用file的话对hdfs的压力又很大


Re:Could not load the native RocksDB library

2019-07-02 Thread Haibo Sun
Hi,  Samya.Patro


I guess this may be a setup problem. What OS and what version of JDK do you 
use?  You can try upgrading JDK to see if the issue can be solved.


Best,
Haibo

At 2019-07-02 17:16:59, "Patro, Samya"  wrote:


Hello,
I am using rocksdb for storing state . But when I run the pipeline I get the 
error   ”Could not load the native RocksDB library” .  Kindly can you check the 
configs and error stacktrace and suggest what am I doing wrong .

 

Flink version  - 1.8.0

 


org.apache.flink
flink-statebackend-rocksdb_2.11
1.8.0


 

This is  the flink checkpointing config I have used

 

executionEnvironment.enableCheckpointing(30);
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5);

executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60);
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

StateBackend rocksDbBackend = new 
RocksDBStateBackend(parameter.get("stateBackendPath"),true);
executionEnvironment.setStateBackend(rocksDbBackend);

 

When I run the pipeline, I get this error

 

java.lang.Exception: Exception while creating StreamOperatorStateContext.

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for StreamFlatMap_9dd63673dd41ea021b896d5203f3ba7c_(1/5) from any of 
the 1 provided restore options.

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)

... 5 more

Caused by: java.io.IOException: Could not load the native RocksDB library

at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:911)

at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:482)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 7 more

Caused by: java.lang.UnsatisfiedLinkError: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
undefined symbol: malloc_stats_print

at java.lang.ClassLoader$NativeLibrary.load(Native Method)

at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)

at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)

at java.lang.Runtime.load0(Runtime.java:809)

at java.lang.System.load(System.java:1086)

at 
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)

at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)

at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:888)

... 11 more

 

 

Thanks and  Regards,
Samya Ranjan Patro
Goldman sachs

 




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread Jark Wu
Thanks for being the release manager and the great job!

Cheers,
Jark

On Wed, 3 Jul 2019 at 10:16, Dian Fu  wrote:

> Awesome! Thanks a lot for being the release manager. Great job! @Jincheng
>
> Regards,
> Dian
>
> 在 2019年7月3日,上午10:08,jincheng sun  写道:
>
> I've also tweeted about it from my twitter:
> https://twitter.com/sunjincheng121/status/1146236834344648704
> later would be tweeted it from @ApacheFlink!
>
> Best, Jincheng
>
> Hequn Cheng  于2019年7月3日周三 上午9:48写道:
>
>> Thanks for being the release manager and the great work Jincheng!
>> Also thanks to Gorden and the community making this release possible!
>>
>> Best, Hequn
>>
>> On Wed, Jul 3, 2019 at 9:40 AM jincheng sun 
>> wrote:
>>
>>> Hi,
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.8.1, which is the first bugfix release for the Apache Flink
>>> 1.8 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> https://flink.apache.org/news/2019/07/02/release-1.8.1.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Great thanks to @Tzu-Li (Gordon) Tai  's offline
>>> kind help!
>>>
>>> Regards,
>>> Jincheng
>>>
>>
>


Re:Re: Fw:Re:Re: About "Flink 1.7.0 HA based on zookeepers "

2019-07-02 Thread Alex.Hu
Hi,Till:
Thank you very much for answering my question!
In the attachment I store the logs from my last attempt to run (flink 
started debug-level logs), and my flink cluster runs in a kubernate-based 
hadoop cluster with 3 nodes and kerberos security authentication enabled. I 
made a flink1.7.2 image based on the docker image given in wiki, and made 
flink's kubernates tag yaml configuration based on other kubernates tag setting 
files in hadoop cluster, in which all pods were set to use the host network 
port directly. So I adjusted some port parameters in my flink cluster Settings. 
I currently run only this one flink cluster, and just started the jobmanager 
pod in the test, according to your email reply I added "high - the 
availability. The zookeeper. Cluster - id" after the relevant parameters, start 
after 2 nodes of pod, kubernate shows a node normal boot (tdh3), while another 
node startup anomaly (tdh2), continue to show before the mail within the same 
error. I have attached flink configuration file flink-yaml, masters, slaves 
configuration file, hadoop cluster zookeeper configuration file, and jobmanager 
pod label configuration file jobmanager-yaml. Thank you very much. Could you 
please help me check what mistakes I have made?




Thank you,
Alex.hu






At 2019-07-02 21:46:35, "Till Rohrmann"  wrote:

Hi,


how did you start the job masters? Could you maybe share the logs of all 
components? It looks as if the leader election is not working properly. One 
thing to make sure is that you specify for every new HA cluster a different 
cluster ID via `high-availability.cluster-id: cluster_xy`. That way you 
separate the ZNodes in ZooKeeper so that every cluster uses their own nodes and 
does not interfere with other clusters. Usually this happens via the JobID but 
in the case of the `StandaloneJobClusterEntrypoint` we set it to 0. More 
recently, this was slightly changed. See 
https://issues.apache.org/jira/browse/FLINK-12617 for more information.


Cheers,
Till


On Mon, Jul 1, 2019 at 11:36 AM Alex.Hu  wrote:


Hi,All:

   I found some problems about on kubernates flink of 1.6.0 mentioned by Till 
in "HA for 1.6.0 job cluster with docker-compose" in the email list, but I 
found that Jira of flink-10291 in the email has been shut down in 1.7.0, and I 
also found similar errors in on kubernates flink of 1.7.2 at present. Could you 
please help me check the Settings where I have problems? Here are my Settings:

web.log.path: /var/log/flink/flinkweb.log 
taskmanager.log.pth: /var/log/flink/taskmanager/task.log 


jobmanager.rpc.address: tdh2
jobmanager.rpc.port: 16223
jobstore.cache-size: 5368709120
jobstore.expiration-time: 864000
jobmanager.heap.size: 4096m


taskmanager.heap.size:  6000m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 2


high-availability: zookeeper
high-availability.storageDir: hdfs:///flink1/ha/
high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
high-availability.jobmanager.port: 62236-62239


rest.port: 18801
io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5


security.kerberos.login.use-ticket-cache: true
security.kerberos.login.contexts: Client
security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
security.kerberos.login.principal: hdfs


blob.server.port: 16224
query.server.port: 16225




   And the following is the new error report, the earliest error report in the 
forwarded email message:


apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not 
set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, 
LocalRpcInvocation(requestRestAddress(Time))) sent to 
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
... 14 common frames omitted
2019-07-01 17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR 
o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve 
the redirect address.
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, 
LocalRpcInvocation(requestRestAddress(Time))) sent to 
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread Kurt Young
Thanks for being the release manager and great job! @Jincheng

Best,
Kurt


On Wed, Jul 3, 2019 at 10:19 AM Tzu-Li (Gordon) Tai 
wrote:

> Thanks for being the release manager @jincheng sun
>  :)
>
> On Wed, Jul 3, 2019 at 10:16 AM Dian Fu  wrote:
>
>> Awesome! Thanks a lot for being the release manager. Great job! @Jincheng
>>
>> Regards,
>> Dian
>>
>> 在 2019年7月3日,上午10:08,jincheng sun  写道:
>>
>> I've also tweeted about it from my twitter:
>> https://twitter.com/sunjincheng121/status/1146236834344648704
>> later would be tweeted it from @ApacheFlink!
>>
>> Best, Jincheng
>>
>> Hequn Cheng  于2019年7月3日周三 上午9:48写道:
>>
>>> Thanks for being the release manager and the great work Jincheng!
>>> Also thanks to Gorden and the community making this release possible!
>>>
>>> Best, Hequn
>>>
>>> On Wed, Jul 3, 2019 at 9:40 AM jincheng sun 
>>> wrote:
>>>
 Hi,

 The Apache Flink community is very happy to announce the release of
 Apache Flink 1.8.1, which is the first bugfix release for the Apache Flink
 1.8 series.

 Apache Flink® is an open-source stream processing framework for
 distributed, high-performing, always-available, and accurate data streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements for this bugfix release:
 https://flink.apache.org/news/2019/07/02/release-1.8.1.html

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164

 We would like to thank all contributors of the Apache Flink community
 who made this release possible!

 Great thanks to @Tzu-Li (Gordon) Tai  's offline
 kind help!

 Regards,
 Jincheng

>>>
>>


Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread Tzu-Li (Gordon) Tai
Thanks for being the release manager @jincheng sun
 :)

On Wed, Jul 3, 2019 at 10:16 AM Dian Fu  wrote:

> Awesome! Thanks a lot for being the release manager. Great job! @Jincheng
>
> Regards,
> Dian
>
> 在 2019年7月3日,上午10:08,jincheng sun  写道:
>
> I've also tweeted about it from my twitter:
> https://twitter.com/sunjincheng121/status/1146236834344648704
> later would be tweeted it from @ApacheFlink!
>
> Best, Jincheng
>
> Hequn Cheng  于2019年7月3日周三 上午9:48写道:
>
>> Thanks for being the release manager and the great work Jincheng!
>> Also thanks to Gorden and the community making this release possible!
>>
>> Best, Hequn
>>
>> On Wed, Jul 3, 2019 at 9:40 AM jincheng sun 
>> wrote:
>>
>>> Hi,
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.8.1, which is the first bugfix release for the Apache Flink
>>> 1.8 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> https://flink.apache.org/news/2019/07/02/release-1.8.1.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Great thanks to @Tzu-Li (Gordon) Tai  's offline
>>> kind help!
>>>
>>> Regards,
>>> Jincheng
>>>
>>
>


Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread Dian Fu
Awesome! Thanks a lot for being the release manager. Great job! @Jincheng

Regards,
Dian

> 在 2019年7月3日,上午10:08,jincheng sun  写道:
> 
> I've also tweeted about it from my twitter: 
> https://twitter.com/sunjincheng121/status/1146236834344648704 
>  
> later would be tweeted it from @ApacheFlink!
> 
> Best, Jincheng
> 
> Hequn Cheng mailto:chenghe...@gmail.com>> 于2019年7月3日周三 
> 上午9:48写道:
> Thanks for being the release manager and the great work Jincheng!
> Also thanks to Gorden and the community making this release possible!
> 
> Best, Hequn
> 
> On Wed, Jul 3, 2019 at 9:40 AM jincheng sun  > wrote:
> Hi,
> 
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.8.1, which is the first bugfix release for the Apache Flink 1.8 
> series. 
> 
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications. 
> 
> The release is available for download at: 
> https://flink.apache.org/downloads.html  
> 
> 
> Please check out the release blog post for an overview of the 
> improvements for this bugfix release: 
> https://flink.apache.org/news/2019/07/02/release-1.8.1.html 
> 
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164
>  
> 
> 
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible! 
> 
> Great thanks to @Tzu-Li (Gordon) Tai  's offline 
> kind help!
> 
> Regards,
> Jincheng



Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread jincheng sun
I've also tweeted about it from my twitter:
https://twitter.com/sunjincheng121/status/1146236834344648704
later would be tweeted it from @ApacheFlink!

Best, Jincheng

Hequn Cheng  于2019年7月3日周三 上午9:48写道:

> Thanks for being the release manager and the great work Jincheng!
> Also thanks to Gorden and the community making this release possible!
>
> Best, Hequn
>
> On Wed, Jul 3, 2019 at 9:40 AM jincheng sun 
> wrote:
>
>> Hi,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.8.1, which is the first bugfix release for the Apache Flink
>> 1.8 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2019/07/02/release-1.8.1.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Great thanks to @Tzu-Li (Gordon) Tai  's offline
>> kind help!
>>
>> Regards,
>> Jincheng
>>
>


Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread Hequn Cheng
Thanks for being the release manager and the great work Jincheng!
Also thanks to Gorden and the community making this release possible!

Best, Hequn

On Wed, Jul 3, 2019 at 9:40 AM jincheng sun 
wrote:

> Hi,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.8.1, which is the first bugfix release for the Apache Flink 1.8
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2019/07/02/release-1.8.1.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Great thanks to @Tzu-Li (Gordon) Tai  's offline
> kind help!
>
> Regards,
> Jincheng
>


[ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread jincheng sun
Hi,

The Apache Flink community is very happy to announce the release of Apache
Flink 1.8.1, which is the first bugfix release for the Apache Flink 1.8
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the
improvements for this bugfix release:
https://flink.apache.org/news/2019/07/02/release-1.8.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Great thanks to @Tzu-Li (Gordon) Tai  's offline kind
help!

Regards,
Jincheng


Can Flink infers the table columns type

2019-07-02 Thread Soheil Pourbafrani
Hi

I want load MySQL tables in Flink without need to specifying column names
and types (like what we can do in Apache Spark DataFrames).

Using the JDBCInputFormat we should pass the table fields type in the
method setRowTypeInfo. I couldn't find any way to force Flink to infer the
column type.

In addition, I was wondering if it's possible to do the same using
TableSource?

thanks


Re: Job tasks are not balance among taskmanagers

2019-07-02 Thread Ken Krugler
Hi Ever,

As Haibo noted, that’s a known regression.

If you fall back to the older approach of having multiple TMs per slave, each 
with one slot, then Flink (as of 1.7/1.8) does a better job of distributing 
work.

— Ken

> On Jul 1, 2019, at 9:23 PM, Haibo Sun  wrote:
> 
> Hi, Ever
> 
> This is a regression wrt the pre Flip-6 code, and the following JIRA 
> dedicated to this issue.
> 
> https://issues.apache.org/jira/browse/FLINK-12122 
>  
>  
> Best,
> Haibo
> 
> 
> 
> At 2019-07-02 11:42:27, "Ever" <439674...@qq.com> wrote:
> Hi, there're 3 taskManager nodes within our testing flink cluster, whose 
> version is 1.8. And each one have 10 taskslots.
> 
> Now I have a job with parallelism 3. 
> I expected the 3 tasks will be located at 3 different taskManagers, just as 
> Example 2 below:
> 
> 
> But it came out that all 3 tasks are all located at the same taskmanager.
> <3503f...@0bbe000a.a3d21a5d.jpg>
> 
> <2608f...@99c54575.a3d21a5d.jpg>
> 
> Why?

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Fw:Re:Re: About "Flink 1.7.0 HA based on zookeepers "

2019-07-02 Thread Till Rohrmann
Hi,

how did you start the job masters? Could you maybe share the logs of all
components? It looks as if the leader election is not working properly. One
thing to make sure is that you specify for every new HA cluster a different
cluster ID via `high-availability.cluster-id: cluster_xy`. That way you
separate the ZNodes in ZooKeeper so that every cluster uses their own nodes
and does not interfere with other clusters. Usually this happens via the
JobID but in the case of the `StandaloneJobClusterEntrypoint` we set it to
0. More recently, this was slightly changed. See
https://issues.apache.org/jira/browse/FLINK-12617 for more information.

Cheers,
Till

On Mon, Jul 1, 2019 at 11:36 AM Alex.Hu  wrote:

> *Hi,All:*
>
> *   I found some problems about on kubernates flink of 1.6.0 mentioned by
> Till in "HA for 1.6.0 job cluster with docker-compose" in the email list,
> but I found that Jira of flink-10291 in the email has been shut down in
> 1.7.0, and I also found similar errors in on kubernates flink of 1.7.2 at
> present. Could you please help me check the Settings where I have problems?
> Here are my Settings:*
> web.log.path: /var/log/flink/flinkweb.log
> taskmanager.log.pth: /var/log/flink/taskmanager/task.log
>
> jobmanager.rpc.address: tdh2
> jobmanager.rpc.port: 16223
> jobstore.cache-size: 5368709120
> jobstore.expiration-time: 864000
> jobmanager.heap.size: 4096m
>
> taskmanager.heap.size:  6000m
> taskmanager.numberOfTaskSlots: 6
> parallelism.default: 2
>
> high-availability: zookeeper
> high-availability.storageDir: hdfs:///flink1/ha/
> high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.client.acl: open
> high-availability.jobmanager.port: 62236-62239
>
> rest.port: 18801
> io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5
>
> security.kerberos.login.use-ticket-cache: true
> security.kerberos.login.contexts: Client
> security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
> security.kerberos.login.principal: hdfs
>
> blob.server.port: 16224
> query.server.port: 16225
>
>
>*And the following is the new error report, the earliest error report
> in the forwarded email message:*
> apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token
> not set: Ignoring message
> LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146,
> LocalRpcInvocation(requestRestAddress(Time))) sent to 
> akka.tcp://flink@tdh2:62236/user/dispatcher
> because the fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
> ... 14 common frames omitted
> 2019-07-01 17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR
> o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not
> retrieve the redirect address.
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token not set: Ignoring message
> LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146,
> LocalRpcInvocation(requestRestAddress(Time))) sent to 
> akka.tcp://flink@tdh2:62236/user/dispatcher
> because the fencing token is null.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
> at akka.actor.ActorRef.tell(ActorRef.scala:130)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendErrorIfSender(AkkaRpcActor.java:371)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:57)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at 

Re: Connection refused while trying to query state

2019-07-02 Thread Kostas Kloudas
No problem! Glad I could help.

Kostas

On Tue, Jul 2, 2019 at 12:11 PM Avi Levi  wrote:

> No, it doesn't. Thanks for pointing it. I just noticed that I wasn't using
> the proxy server address.
> Thanks !!!
>
>
> On Tue, Jul 2, 2019 at 12:16 PM Kostas Kloudas  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi Avi,
>>
>> Do you point the client to the correct address? This means where the
>> "Queryable State Proxy Server @ ..." says?
>>
>> Cheers,
>> Kostas
>>
>> On Sun, Jun 30, 2019 at 4:37 PM Avi Levi  wrote:
>>
>>> Hi,
>>> I am trying to query state (cluster 1.8.0 is running on my local
>>> machine) .
>>> I do see in the logs "Started the Queryable State Proxy Server @ ...".
>>>
>>> but when I am trying to query the state from the client ,
>>> val descriptor = new ValueStateDescriptor("queryable-state",
>>> Types.CASE_CLASS[State])
>>> client.getKvState(jobId, "seen-domains",key,
>>> BasicTypeInfo.STRING_TYPE_INFO, descriptor)
>>> I am getting the following exception :
>>>
>>> [ERROR] [06/30/2019 16:59:48.850]
>>> [bvAkkaHttpServer-akka.actor.default-dispatcher-10]
>>> [akka.actor.ActorSystemImpl(AkkaHttpServer)] Error during processing of
>>> request:
>>> 'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>> Connection refused: /127.0.0.1:9069'. Completing with 500 Internal
>>> Server Error response. To change default exception handling behavior,
>>> provide a custom ExceptionHandler.
>>> java.util.concurrent.CompletionException:
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>> Connection refused: /127.0.0.1:9069
>>> at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>>> at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>> at
>>> org.apache.flink.queryablestate.network.Client$PendingConnection.close(Client.java:377)
>>> at
>>> org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:270)
>>> at
>>> org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:231)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by:
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>> Connection refused: /127.0.0.1:9069
>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
>>> ... 6 more
>>> Caused by: java.net.ConnectException: Connection refused
>>> ... 10 more
>>>
>>>


Re: Connection refused while trying to query state

2019-07-02 Thread Avi Levi
No, it doesn't. Thanks for pointing it. I just noticed that I wasn't using
the proxy server address.
Thanks !!!


On Tue, Jul 2, 2019 at 12:16 PM Kostas Kloudas  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> Do you point the client to the correct address? This means where the
> "Queryable State Proxy Server @ ..." says?
>
> Cheers,
> Kostas
>
> On Sun, Jun 30, 2019 at 4:37 PM Avi Levi  wrote:
>
>> Hi,
>> I am trying to query state (cluster 1.8.0 is running on my local machine)
>> .
>> I do see in the logs "Started the Queryable State Proxy Server @ ...".
>>
>> but when I am trying to query the state from the client ,
>> val descriptor = new ValueStateDescriptor("queryable-state",
>> Types.CASE_CLASS[State])
>> client.getKvState(jobId, "seen-domains",key,
>> BasicTypeInfo.STRING_TYPE_INFO, descriptor)
>> I am getting the following exception :
>>
>> [ERROR] [06/30/2019 16:59:48.850]
>> [bvAkkaHttpServer-akka.actor.default-dispatcher-10]
>> [akka.actor.ActorSystemImpl(AkkaHttpServer)] Error during processing of
>> request:
>> 'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection refused: /127.0.0.1:9069
>> '.
>> Completing with 500 Internal Server Error response. To change default
>> exception handling behavior, provide a custom ExceptionHandler.
>> java.util.concurrent.CompletionException:
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection refused: /127.0.0.1:9069
>> 
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at
>> org.apache.flink.queryablestate.network.Client$PendingConnection.close(Client.java:377)
>> at
>> org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:270)
>> at
>> org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:231)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection refused: /127.0.0.1:9069
>> 
>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
>> ... 6 more
>> Caused by: java.net.ConnectException: Connection refused
>> ... 10 more
>>
>>


Re: LookupableTableSource question

2019-07-02 Thread JingsongLee
> how do I enable Blink planner support? 
After flink-1.9 release, you can try Blink-planner.

>Since when is LATERAL TABLE available in Flink? Is it equivalent to using 
>temporal tables?
LATERAL TABLE is table function in table, it is available in Flink for a long 
time.[1]
It is different from temporal table.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/udfs.html#table-functions

Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年7月1日(星期一) 21:26
To:JingsongLee 
Cc:user 
Subject:Re: LookupableTableSource question

I probably messed up with the meaning of eval()..thus it is called once for 
every distinct key (that could be composed by a combination of fields)?
So, the other question is..how do I enable Blink planner support? 
Since when is LATERAL TABLE available in Flink? Is it equivalent to using 
temporal tables [1]?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html

Best,
Flavio
On Sat, Jun 29, 2019 at 3:16 AM JingsongLee  wrote:
The keys means joint primary keys, it is not list of keys, in your case, maybe 
there is a single key?

Best, Jingsong Lee


来自阿里邮箱 iPhone版
 --Original Mail --
From:Flavio Pompermaier 
Date:2019-06-28 22:53:31
Recipient:JingsongLee 
CC:user 
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...
On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier  wrote:
This could be a good fit, I'll try to dig into it and see if it can be adapted 
to a REST service.
The only strange thing I see is that the key of the local cache is per block of 
keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

while I'd use the following (also for JDBC):

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee  wrote:
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee

--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it lacks of such 2 

Re:Re: File Naming Pattern from HadoopOutputFormat

2019-07-02 Thread Haibo Sun

Hi, Andreas 


You are right. To meet this requirement, Flink should need to expose a 
interface to allow customizing the filename.
 

Best,
Haibo

At 2019-07-02 16:33:44, "Yitzchak Lieberman"  wrote:

regarding option 2 for parquet:
implementing bucket assigner won't set the file name as getBucketId() defined 
the directory for the files in case of partitioning the data, for example:
/day=20190101/part-1-1
there is an open issue for that: 
https://issues.apache.org/jira/browse/FLINK-12573


On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun  wrote:

Hi, Andreas


I think the following things may be what you want.


1. For writing Avro, I think you can extend AvroOutputFormat and override the  
getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html


public static class CustomAvroOutputFormat extends AvroOutputFormat {
public CustomAvroOutputFormat(Path filePath, Class type) {
super(filePath, type);
}

public CustomAvroOutputFormat(Class type) {
super(type);
}

@Override
public void open(int taskNumber, int numTasks) throws 
IOException {
this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
super.open(taskNumber, numTasks);
}

@Override
protected String getDirectoryFileName(int taskNumber) {
// returns a custom filename
return null;
}
}


2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, 
StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a 
class that implements the BucketAssigner interface and return a custom file 
name in the getBucketId() method (the value returned by getBucketId() will be 
treated as the file name).


ParquetStreamingFileSinkITCase:  
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java


StreamingFileSink#forBulkFormat: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java


DateTimeBucketAssigner: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java




Best,
Haibo

At 2019-07-02 04:15:07, "Hailu, Andreas"  wrote:


Hello Flink team,

 

I’m writing Avro and Parquet files to HDFS, and I’ve would like to include a 
UUID as a part of the file name.

 

Our files in HDFS currently follow this pattern:

 

tmp-r-1.snappy.parquet

tmp-r-2.snappy.parquet

...

 

I’m using a custom output format which extends a RichOutputFormat - is this 
something which is natively supported? If so, could you please recommend how 
this could be done, or share the relevant document?

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Batch mode with Flink 1.8 unstable?

2019-07-02 Thread Till Rohrmann
Thanks for the update Ken. The input splits seem to
be org.apache.hadoop.mapred.FileSplit. Nothing too fancy pops into my eye.
Internally they use org.apache.hadoop.mapreduce.lib.input.FileSplit which
stores a Path, two long pointers and two string arrays with hosts and host
infos. I would assume that they are not exceeding the 10 MB framesize limit.

Once you see the problem happen again, it would also be helpful to save the
logs.

Cheers,
Till

On Tue, Jul 2, 2019 at 2:21 AM Ken Krugler 
wrote:

> Hi Stephan,
>
> Thanks for responding, comments inline below…
>
> Regards,
>
> — Ken
>
> On Jun 26, 2019, at 7:50 AM, Stephan Ewen  wrote:
>
> Hi Ken!
>
> Sorry to hear you are going through this experience. The major focus on
> streaming so far means that the DataSet API has stability issues at scale.
> So, yes, batch mode in current Flink version can be somewhat tricky.
>
> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by
> addressing batch specific scheduling / recovery / and shuffle issues.
>
> Let me go through the issues you found:
>
> *(1) Input splits and oversized RPC*
>
> Your explanation seems correct, timeout due to dropping oversized RPC
> message.
>
> I don't quite understand how that exactly happens, because the size limit
> is 10 MB and input splits should be rather small in most cases.
> Are you running custom sources which put large data into splits? Maybe
> accidentally, by having a large serialized closure in the splits?
>
>
> As per my email to Till, I don’t feel like I’m doing anything tricky,
> though I am reading Hadoop sequence files that contain Cascading
> Tuple/Tuple key/value data.
>
> The fix would be this issue:
> https://issues.apache.org/jira/browse/FLINK-4399
>
> *(2) TM early release*
>
> The 1.8 version had a fix that should work for regular cases without
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained
> recovery
>
> Are you trying to use the finer grained failover with the batch job?
>
>
> No, or at least I’m not doing anything special to enable it.
>
> Is there something I need to do to explicitly _disable_ it?
>
> The finer-grained failover is not working in batch for 1.8, that is why it
> is not an advertised feature (it only works for streaming so far).
>
> The goal is that this works in the 1.9 release (aka the batch fixup
> release)
>
> (3) Hang in Processing
>
> I think a thread dump (jstack) from the TMs would be helpful to diagnose
> that.
> There are known issues with the current batch shuffle implementation,
> which is why 1.9 is getting a new bounded-blocking stream shuffle
> implementation.
>
>
> Next time it happens, I’ll dump the threads.
>
> I should have done it this time, but was in a hurry to kill the EMR
> cluster as it had been costing money all night long :(
>
>
>
> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler 
> wrote:
>
>> Hi all,
>>
>> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink
>> 1.8.0, and it regularly fails, but for varying reasons.
>>
>> Has anyone else had stability with 1.8.0 in batch mode and non-trivial
>> workflows?
>>
>> Thanks,
>>
>> — Ken
>>
>> *1. TimeoutException getting input splits*
>>
>> The batch job starts by processing a lot of files that live in S3. During
>> this phase, I sometimes see:
>>
>> 2019-06-20 01:20:22,659 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
>> DataSource (at createInput(ExecutionEnvironment.java:549)
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad
>> dailies) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key
>> Extractor) -> Combine (Reduce at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32)
>> (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
>> java.lang.RuntimeException: Could not retrieve next input split.
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>> Requesting the next input split failed.
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>> ... 3 more
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> 

Could not load the native RocksDB library

2019-07-02 Thread Patro, Samya
Hello,
I am using rocksdb for storing state . But when I run the pipeline I get the 
error   "Could not load the native RocksDB library" .  Kindly can you check the 
configs and error stacktrace and suggest what am I doing wrong .

Flink version  - 1.8.0



org.apache.flink
flink-statebackend-rocksdb_2.11
1.8.0


This is  the flink checkpointing config I have used

executionEnvironment.enableCheckpointing(30);
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60);
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StateBackend rocksDbBackend = new 
RocksDBStateBackend(parameter.get("stateBackendPath"),true);
executionEnvironment.setStateBackend(rocksDbBackend);

When I run the pipeline, I get this error

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for StreamFlatMap_9dd63673dd41ea021b896d5203f3ba7c_(1/5) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 5 more
Caused by: java.io.IOException: Could not load the native RocksDB library
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:911)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:482)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 7 more
Caused by: java.lang.UnsatisfiedLinkError: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
undefined symbol: malloc_stats_print
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at 
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:888)
... 11 more


Thanks and  Regards,
Samya Ranjan Patro
Goldman sachs




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Connection refused while trying to query state

2019-07-02 Thread Kostas Kloudas
Hi Avi,

Do you point the client to the correct address? This means where the
"Queryable State Proxy Server @ ..." says?

Cheers,
Kostas

On Sun, Jun 30, 2019 at 4:37 PM Avi Levi  wrote:

> Hi,
> I am trying to query state (cluster 1.8.0 is running on my local machine) .
> I do see in the logs "Started the Queryable State Proxy Server @ ...".
>
> but when I am trying to query the state from the client ,
> val descriptor = new ValueStateDescriptor("queryable-state",
> Types.CASE_CLASS[State])
> client.getKvState(jobId, "seen-domains",key,
> BasicTypeInfo.STRING_TYPE_INFO, descriptor)
> I am getting the following exception :
>
> [ERROR] [06/30/2019 16:59:48.850]
> [bvAkkaHttpServer-akka.actor.default-dispatcher-10]
> [akka.actor.ActorSystemImpl(AkkaHttpServer)] Error during processing of
> request:
> 'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: /127.0.0.1:9069'. Completing with 500 Internal Server
> Error response. To change default exception handling behavior, provide a
> custom ExceptionHandler.
> java.util.concurrent.CompletionException:
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: /127.0.0.1:9069
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.queryablestate.network.Client$PendingConnection.close(Client.java:377)
> at
> org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:270)
> at
> org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:231)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: /127.0.0.1:9069
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
> ... 6 more
> Caused by: java.net.ConnectException: Connection refused
> ... 10 more
>
>


Re: Flink Kafka ordered offset commit & unordered processing

2019-07-02 Thread Piotr Nowojski
Hi,

If your async operations are stalled, this will eventually cause problems. 
Either this will back pressure sources (the async’s operator queue will become 
full) or you will run out of memory (if you configured the queue’s capacity too 
high). I think the only possible solution is to either drop records in some 
way, or to spill them to some storage for later processing (assuming that the 
storage will not overflow/will not cause stalls on it’s own).

Regarding the Kafka offsets, as you wrote, Flink’s KafkaConsumer is not using 
internal Kafka offsets for recovery - for this purpose Kafka offsets are stored 
inside Flink’s state.

Regarding the checkpointing you can read about how it’s being done in general 
in the docs [1]. Once barrier alignment for the async operator is done, it 
checkpoints its state. Part of this state are the queues of elements that are 
currently being processed asynchronously. So if failure happens, after recovery 
all of the operators (sources, async operator, sinks, …) are restored 
effectively to the same logical point of time. In case of async operator, async 
operations that were caught in the middle of processing when checkpoint 
barriers arrived are resubmitted/retried.

I hope that answers yours questions :)

Piotrek 

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html
 


> On 30 Jun 2019, at 04:47, wang xuchen  wrote:
> 
> Hi Flink experts,
> 
> I am prototyping a real time system that reads from Kafka source with Flink 
> and calls out to an external system as part of the event processing. One of 
> the most important requirements are read from Kafka should NEVER stall, even 
> in face of some async external calls slowness while holding certain some 
> kafka offsets. At least once processing is good enough. 
> 
> Currently, I am using AsyncIO with a thread pool of size 20. My understanding 
> is if I use orderedwait with a large 'capacity', consumption from Kafka 
> should continue even if some external calls experience slowness (holding the 
> offsets) as long as the capacity is not exhausted. 
> 
> (From my own reading of Flink source code, the capacity of the orderedwait 
> function translate to the size of the OrderedStreamElementQueue size.)
> 
> However, I expect that while the external calls stuck, stream source should 
> keep pumping out from Kafka as long as there is still capacity, but offset 
> after the stuck record should NOT be committed back to Kafka and (the 
> checkpoint should also stall to accomodate the stalled offests?)
> 
> My observation is, if I set the capacity large enough (max_int / 100 for 
> instance), the consumption was not stalled (which is good), but the offsets 
> were all committed back to Kafka AFTER the stalled records and all checkpoint 
> succeeded, no back pressure was incurred.
> 
> In this case, if some machines crash, how does Flink recover the stalled 
> offsets? Which checkpoint does Flink rollback to?  I understand that 
> commiting offset back to Kafka is merely to show progress to external 
> monitoring tool, but I hope Flink does book keeping somewhere to journal 
> async call xyz is not return and should be retried during recovery.
> 
> Thanks a lot
> Ben
> 
> 
> 
> 



Re: Maybe a flink bug. Job keeps in FAILING state

2019-07-02 Thread Till Rohrmann
Thanks for reporting this problem Joshua. I think this is actually a
problem we should fix. The cause seems to be that we swallow the OOM
exception when calling `Task#failExternally`. Probably we don't set the
right uncaught exception handler in the thread which executes the
checkpoint. Let's continue our discussion on the JIRA issue.

Thanks Zhijiang for analysing the problem.

Cheers,
Till

On Tue, Jun 25, 2019 at 5:21 AM zhijiang  wrote:

> Thanks for opening this ticket and I would watch it.
>
> Flink does not handle OOM issue specially. I remembered we ever discussed
> the similar issue before but forgot the conclusion then or have other
> concerns for it.
> I am not sure whether it is worth to fix atm, maybe Till or Chesnay could
> give a final decision.
>
> Best,
> Zhijiang
>
> --
> From:Joshua Fan 
> Send Time:2019年6月25日(星期二) 11:10
> To:zhijiang 
> Cc:Chesnay Schepler ; user ;
> Till Rohrmann 
> Subject:Re: Maybe a flink bug. Job keeps in FAILING state
>
> Hi Zhijiang
>
> Thank you for your analysis. I agree with it. The solution may be to let
> tm exit like you mentioned when any type of oom occurs, because the flink
> has no control on a tm when a oom occurs.
>
> I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889.
>
> Don't know it is worth to fix.
>
> Thank you all.
>
> Yours sincerely
> Joshua
>
> On Fri, Jun 21, 2019 at 5:32 PM zhijiang 
> wrote:
> Thanks for the reminding @Chesnay Schepler .
>
> I just looked throught the related logs. Actually all the five
> "Source: ServiceLog" tasks are not in terminal state on JM view, the
> relevant processes are as follows:
>
> 1. The checkpoint in task causes OOM issue which would call
> `Task#failExternally` as a result, we could see the log "Attempting to
> fail task externally" in tm.
> 2. The source task would transform state from RUNNING to FAILED and then
> starts a canceler thread for canceling task, we could see log "Triggering
> cancellation of task" in tm.
> 3. When JM starts to cancel the source tasks, the rpc call
> `Task#cancelExecution` would find the task was already in FAILED state as
> above step 2, we could see log "Attempting to cancel task" in tm.
>
> At last all the five source tasks are not in terminal states from jm log,
> I guess the step 2 might not create canceler thread successfully, because
> the root failover was caused by OOM during creating native thread in step1,
> so it might exist possibilities that createing canceler thread is not
> successful as well in OOM case which is unstable. If so, the source task
> would not been interrupted at all, then it would not report to JM as well,
> but the state is already changed to FAILED before.
>
> For the other vertex tasks, it does not trigger `Task#failExternally` in
> step 1, and only receives the cancel rpc from JM in step 3. And I guess at
> this time later than the source period, the canceler thread could be
> created succesfully after some GCs, then these tasks could be canceled as
> reported to JM side.
>
> I think the key problem is under OOM case some behaviors are not within
> expectations, so it might bring problems. Maybe we should handle OOM error
> in extreme way like making TM exit to solve the potential issue.
>
> Best,
> Zhijiang
> --
> From:Chesnay Schepler 
> Send Time:2019年6月21日(星期五) 16:34
> To:zhijiang ; Joshua Fan <
> joshuafat...@gmail.com>
> Cc:user ; Till Rohrmann 
> Subject:Re: Maybe a flink bug. Job keeps in FAILING state
>
> The logs are attached to the initial mail.
>
> Echoing my thoughts from earlier: from the logs it looks as if the TM
> never even submits the terminal state RPC calls for several tasks to the JM.
>
> On 21/06/2019 10:30, zhijiang wrote:
> Hi Joshua,
>
> If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really
> in CANCELED state on TM side, but in CANCELING state on JM side, then it
> might indicates the terminal state RPC was not received by JM. I am not
> sure whether the OOM would cause this issue happen resulting in unexpected
> behavior.
>
> In addition, you mentioned these tasks are still active after OOM and was
> called to cancel, so I am not sure what is the specific periods for your
> attached TM stack. I think it might provide help if you could provide
> corresponding TM log and JM log.
> From TM log it is easy to check the task final state.
>
> Best,
> Zhijiang
> --
> From:Joshua Fan  
> Send Time:2019年6月20日(星期四) 11:55
> To:zhijiang  
> Cc:user  ; Till Rohrmann
>  ; Chesnay Schepler
>  
> Subject:Re: Maybe a flink bug. Job keeps in FAILING state
>
> zhijiang
>
> I did not capture the job ui, the topology is in FAILING state, but the
> persistentbolt subtasks as can be seen in the picture attached in first
> mail was all canceled, and the parsebolt subtasks as described before had
> one 

Re: RE: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

2019-07-02 Thread vino yang
Hi all,

In the past, I have tried to further refine the design of this topic thread
and wrote a design document to give more detailed design images and text
description, so that it is more conducive to discussion.[1]

Note: The document is not yet completed, for example, the "Implementation"
section is missing. Therefore, it is still in an open discussion state. I
will improve the rest while listening to the opinions of the community.

Welcome and appreciate more discussions and feedback.

Best,
Vino

[1]:
https://docs.google.com/document/d/181qYVIiHQGrc3hCj3QBn1iEHF4bUztdw4XO8VSaf_uI/edit?usp=sharing


yanghua1127  于2019年6月7日周五 下午11:32写道:

> Hi Georgi,
>
> Thanks for your feedback. And glad to hear you are using queryable state.
>
> I agree that implementation of option 1 is easier than others. However,
> when we design the new architecture we need to consider more aspects .e.g.
> scalability. So it seems option 3 is more suitable. Actually, some
> committers such as Stefan, Gordon and Aljoscha have given me feedback and
> direction.
>
> Currently, I am writing the design document. If it is ready to be
> presented. I will copy to this thread and we can discuss further details.
>
> 
> Best,
> Vino
>
>
> On 2019-06-07 19:03 , Georgi Stoyanov  Wrote:
>
> Hi Vino,
>
>
>
> I was investigating the current architecture and AFAIK the first proposal
> will be a lot easier to implement, cause currently JM has the information
> about the states (where, which etc thanks to KvStateLocationRegistry.
> Correct me if I’m wrong)
>
> We are using the feature and it’s indeed not very cool to iterate trough
> ports, check which TM is the responsible one etc etc.
>
>
>
> It will be very useful if someone from the committers joins the topic and
> give us some insights what’s going to happen with that feature.
>
>
>
>
>
> Kind Regards,
>
> Georgi
>
>
>
>
>
>
>
> *From:* vino yang 
> *Sent:* Thursday, April 25, 2019 5:18 PM
> *To:* dev ; user 
> *Cc:* Stefan Richter ; Aljoscha Krettek <
> aljos...@apache.org>; kklou...@gmail.com
> *Subject:* [DISCUSS] Improve Queryable State and introduce a
> QueryServerProxy component
>
>
>
> Hi all,
>
>
>
> I want to share my thought with you about improving the queryable state
> and introducing a QueryServerProxy component.
>
>
>
> I think the current queryable state's client is hard to use. Because it
> needs users to know the TaskManager's address and proxy's port. Actually,
> some business users who do not have good knowledge about the Flink's inner
> or runtime in production. However, sometimes they need to query the values
> of states.
>
>
>
> IMO, the reason caused this problem is because of the queryable state's
> architecture. Currently, the queryable state clients interact with
> query state client proxy components which host on each TaskManager. This
> design is difficult to encapsulate the point of change and exposes too much
> detail to the user.
>
>
>
> My personal idea is that we could introduce a really queryable state
> server, named e.g. *QueryStateProxyServer* which would delegate all the
> query state request and query the local registry then redirect the request
> to the specific *QueryStateClientProxy*(runs on each TaskManager). The
> server is the users really want to care about. And it would make the users
> ignorant to the TaskManagers' address and proxies' port. The current
> *QueryStateClientProxy* would become *QueryStateProxyClient*.
>
>
>
> Generally speaking, the roles of the QueryStateProxyServer list below:
>
>
>
>- works as all the query client's proxy to receive all the request and
>send response;
>- a router to redirect the real query requests to the specific proxy
>client;
>- maintain route table registry (state <-> TaskManager,
>TaskManager<->proxy client address)
>- more fine-granted control, such as cache result, ACL, TTL, SLA(rate
>limit) and so on
>
> About the implementation, there are three opts:
>
>
>
> opt 1:
>
>
>
> Let the JobManager acts as the query proxy server.
>
> ·  pros: reuse the exists JM, do not need to introduce a new process can
> reduce the complexity;
>
> ·  cons: would make JM heavy burdens, depends on the query frequency, may
> impact on the stability
>
>
>
> [image: Screen Shot 2019-04-25 at 5.12.07 PM.png]
>
>
>
> opt 2:
>
>
>
> Introduce a new component  which runs as a single process and acts as the
> query proxy server:
>
>
>
> ·  pros: reduce the burdens and make the JM more stability
>
> ·  cons: introduced a new component will make the implementation more
> complexity
>
> [image: Screen Shot 2019-04-25 at 5.14.05 PM.png]
>
>
>
> opt 3 (suggestion comes from Stefan Richter):
>
>
>
> Combining the two opts, the query server could run as a single entry
> point(process) and integrate with JobManager.
>
>
>
> If we keep it well encapsulated, the only difference would be how we
> register new TMs with the query server in the different scenarios, in JM we
> might have this information 

Re: File Naming Pattern from HadoopOutputFormat

2019-07-02 Thread Yitzchak Lieberman
regarding option 2 for parquet:
implementing bucket assigner won't set the file name as getBucketId()
defined the directory for the files in case of partitioning the data,
for example:
/day=20190101/part-1-1
there is an open issue for that:
https://issues.apache.org/jira/browse/FLINK-12573

On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun  wrote:

> Hi, Andreas
>
> I think the following things may be what you want.
>
> 1. For writing Avro, I think you can extend AvroOutputFormat and override
> the  getDirectoryFileName() method to customize a file name, as shown below.
> The javadoc of AvroOutputFormat:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html
>
>   public static class CustomAvroOutputFormat extends AvroOutputFormat {
>   public CustomAvroOutputFormat(Path filePath, Class type) {
>   super(filePath, type);
>   }
>
>   public CustomAvroOutputFormat(Class type) {
>   super(type);
>   }
>
>   @Override
>   public void open(int taskNumber, int numTasks) throws 
> IOException {
>   this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
>   super.open(taskNumber, numTasks);
>   }
>
>   @Override
>   protected String getDirectoryFileName(int taskNumber) {
>   // returns a custom filename
>   return null;
>   }
>   }
>
>
> 2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase,
> StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create
> a class that implements the BucketAssigner interface and return a custom
> file name in the getBucketId() method (the value returned by getBucketId()
> will be treated as the file name).
>
> ParquetStreamingFileSinkITCase:
> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>
> StreamingFileSink#forBulkFormat:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
>
> DateTimeBucketAssigner:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
>
>
> Best,
> Haibo
>
> At 2019-07-02 04:15:07, "Hailu, Andreas"  wrote:
>
> Hello Flink team,
>
>
>
> I’m writing Avro and Parquet files to HDFS, and I’ve would like to include
> a UUID as a part of the file name.
>
>
>
> Our files in HDFS currently follow this pattern:
>
>
>
> *tmp-r-1.snappy.parquet*
>
> *tmp-r-2.snappy.parquet*
>
> *...*
>
>
>
> I’m using a custom output format which extends a RichOutputFormat - is
> this something which is natively supported? If so, could you please
> recommend how this could be done, or share the relevant document?
>
>
>
> Best,
>
> Andreas
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>


Flink not able to load native rocksdb library

2019-07-02 Thread Patro, Samya
Hello,
I am using rocksdb for storing state . But when I run the pipeline I get the 
error   "Could not load the native RocksDB library" .  Kindly can you check the 
configs and error stacktrace and suggest what am I doing wrong .

Flink version  - 1.8.0



org.apache.flink
flink-statebackend-rocksdb_2.11
1.8.0


This is  the flink checkpointing config I have used

executionEnvironment.enableCheckpointing(30);
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60);
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StateBackend rocksDbBackend = new 
RocksDBStateBackend(parameter.get("stateBackendPath"),true);
executionEnvironment.setStateBackend(rocksDbBackend);

When I run the pipeline, I get this error

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for StreamFlatMap_9dd63673dd41ea021b896d5203f3ba7c_(1/5) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 5 more
Caused by: java.io.IOException: Could not load the native RocksDB library
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:911)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:482)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 7 more
Caused by: java.lang.UnsatisfiedLinkError: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
undefined symbol: malloc_stats_print
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at 
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:888)
... 11 more


Thanks and  Regards,
Samya Ranjan Patro
Goldman sachs



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Setting consumer offset

2019-07-02 Thread Paul Lam
Hi Avi,

Yes, it will. The restored state takes priority over the start position.

Best,
Paul Lam

> 在 2019年7月2日,15:11,Avi Levi  写道:
> 
> Hi,
> If I set in code the consumer offset e.g consumer.setStartFromTimestamp and I 
> start the job from a curtain savepoint/checkpoint will the offset in the 
> checkpoint will override the the offset that is defined in the code  ?
> 
> Best Regards
> Avi
> 



Setting consumer offset

2019-07-02 Thread Avi Levi
Hi,
If I set in code the consumer offset e.g *consumer.setStartFromTimestamp*
and I start the job from a curtain savepoint/checkpoint will the offset in
the checkpoint will override the the offset that is defined in the code  ?

Best Regards
Avi