Re: How to install Flink + YARN?

2019-11-19 Thread Pankaj Chand
Thank you, Ana and Yang!

On Tue, Nov 19, 2019, 9:29 PM Yang Wang  wrote:

> Hi Pankaj,
>
> First, you need to prepare a hadoop environment separately, including hdfs
> and Yarn. If you are familiar
> with hadoop, you could download the binary[1] and start the cluster on you
> nodes manually. Otherwise,
> some tools may help you to deploy a hadoop cluster, ambari[2] and cloudera
> manager[2].
>
> Then, download the Flink with Hadoop pre-bundle. You can submit your flink
> job now.
>
> Please make sure you set the correct HADOOP_CONF_DIR in your flink client
> before starting a flink cluster.
>
>
>
> [1].https://hadoop.apache.org/releases.html
> [2].
> https://www.cloudera.com/products/open-source/apache-hadoop/apache-ambari.html
> [3].
> https://www.cloudera.com/products/product-components/cloudera-manager.html
>
> Ana  于2019年11月20日周三 上午10:12写道:
>
>> Hi,
>>
>> I was able to run Flink on YARN by installing YARN and Flink separately.
>>
>> Thank you.
>>
>> Ana
>>
>> On Wed, Nov 20, 2019 at 10:42 AM Pankaj Chand 
>> wrote:
>>
>>> Hello,
>>>
>>> I want to run Flink on YARN upon a cluster of nodes. From the
>>> documentation, I was not able to fully understand how to go about it. Some
>>> of the archived answers are a bit old and had pending JIRA issues, so I
>>> thought I would ask.
>>>
>>> Am I first supposed to install YARN separately, and then download the
>>> Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put
>>> into Flink's /lib folder provide the entire YARN installation?
>>>
>>> Is there any download that bundles a complete *installation of Fink +
>>> installation of YARN*?
>>>
>>> Thank you,
>>>
>>> Pankaj
>>>
>>


How to estimate the memory size of flink state

2019-11-19 Thread 刘建刚
  We are using flink 1.6.2. For filesystem backend, we want to monitor
the state size in memory. Once the state size becomes bigger, we can get
noticed and take measures such as rescaling the job, or the job may fail
because of the memory.
  We have tried to get the memory usage for the jvm, like gc throughput.
For our case, state can vary greatly at the peak. So maybe I can refer to
the state memory size.
  I checked the metrics and code, but didn't find any information about
the state memory size. I can get the checkpoint size, but they are
serialized result that can not reflect the running state in memory.  Can
anyone give me some suggestions? Thank you very much.


How to estimate the memory size of flink state

2019-11-19 Thread 刘建刚
  We are using flink 1.6.2. For filesystem backend, we want to monitor
the state size in memory. Once the state size becomes bigger, we can get
noticed and take measures such as rescaling the job, or the job may fail
because of the memory.
  We have tried to get the memory usage for the jvm, like gc throughput.
For our case, state can vary greatly at the peak. So maybe I can refer to
the state memory size.
  I checked the metrics and code, but didn't find any information about
the state memory size. I can get the checkpoint size, but they are
serialized result that can not reflect the running state in memory.  Can
anyone give me some suggestions? Thank you very much.


Re??Re?? how to setup a ha flink cluster on k8s?

2019-11-19 Thread Rock
Hi Yang Wang,


Thanks for your reply, I MAY HAVE setup a ha cluster succefully. The reason I 
can't setup before may be some bug about s3 in flink, after change to hdfs,I 
can run it suceefully.


But after about one day of running ,the job-manager will crash and can't 
recover automatic, I must apply the deployment of job-manager manually (and 
that will fix the problom,my jobs will auto start), so strange 


Since I changed too many from the yaml in flink's doc, I really don't know 
where is my conf is wrong.But I have add logback to flink and let
it send log to my elasticsearch cluster,may the log can tell more..


----
??:"Yang Wang"https://github.com/pravega/zookeeper-operator

[2].https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster


vino yang http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/


Rock https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.htmlthis
 not ha , when job-manager down and rescheduled

the metadata for running job is lost.



I tried to use ha setup for 
zkhttps://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.htmlon
 k8s , but can't get it right.



Stroing job's metadata on k8s using pvc or other external file 
systemshould be very easy.Is there a way toachieve it.

Re: [DISCUSS] Support configure remote flink jar

2019-11-19 Thread tison
Thanks for your participation!

@Yang: Great to hear. I'd like to know whether or not a remote flink jar
path conflicts with FLINK-13938. IIRC FLINK-13938 auto excludes local flink
jar from shipping which possibly not works for the remote one.

@Thomas: It inspires a lot URL becomes the unified representation of
resource. I'm thinking of how to serve a unique process getting resource
from URL which points to an artifact or distributed file system.

@ouywl & Stephan: Yes this improvement can be migrated to environment like
k8s, IIRC the k8s proposal already discussed about improvement using "init
container" and other technologies. However, so far I regard it is an
improvement different from one storage to another so that we achieve then
individually.


Best,
tison.


Stephan Ewen  于2019年11月20日周三 上午12:34写道:

> Would that be a feature specific to Yarn? (and maybe standalone sessions)
>
> For containerized setups, and init container seems like a nice way to
> solve this. Also more flexible, when it comes to supporting authentication
> mechanisms for the target storage system, etc.
>
> On Tue, Nov 19, 2019 at 5:29 PM ouywl  wrote:
>
>> I have implemented this feature in our env, Use ‘Init Container’ of
>> docker to get URL of a jar file ,It seems a good idea.
>>
>> ouywl
>> ou...@139.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 11/19/2019 12:11,Thomas Weise  wrote:
>>
>> There is a related use case (not specific to HDFS) that I came across:
>>
>> It would be nice if the jar upload endpoint could accept the URL of a jar
>> file as alternative to the jar file itself. Such URL could point to an
>> artifactory or distributed file system.
>>
>> Thomas
>>
>>
>> On Mon, Nov 18, 2019 at 7:40 PM Yang Wang  wrote:
>>
>>> Hi tison,
>>>
>>> Thanks for your starting this discussion.
>>> * For user customized flink-dist jar, it is an useful feature. Since it
>>> could avoid to upload the flink-dist jar
>>> every time. Especially in production environment, it could accelerate the
>>> submission process.
>>> * For the standard flink-dist jar, FLINK-13938[1] could solve
>>> the problem.Upload a official flink release
>>> binary to distributed storage(hdfs) first, and then all the submission
>>> could benefit from it. Users could
>>> also upload the customized flink-dist jar to accelerate their submission.
>>>
>>> If the flink-dist jar could be specified to a remote path, maybe the user
>>> jar have the same situation.
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-13938
>>>
>>> tison  于2019年11月19日周二 上午11:17写道:
>>>
>>> > Hi forks,
>>> >
>>> > Recently, our customers ask for a feature configuring remote flink jar.
>>> > I'd like to reach to you guys
>>> > to see whether or not it is a general need.
>>> >
>>> > ATM Flink only supports configures local file as flink jar via `-yj`
>>> > option. If we pass a HDFS file
>>> > path, due to implementation detail it will fail with
>>> > IllegalArgumentException. In the story we support
>>> > configure remote flink jar, this limitation is eliminated. We also make
>>> > use of YARN locality so that
>>> > reducing uploading overhead, instead, asking YARN to localize the jar
>>> on
>>> > AM container started.
>>> >
>>> > Besides, it possibly has overlap with FLINK-13938. I'd like to put the
>>> > discussion on our
>>> > mailing list first.
>>> >
>>> > Are you looking forward to such a feature?
>>> >
>>> > @Yang Wang: this feature is different from that we discussed offline,
>>> it
>>> > only focuses on flink jar, not
>>> > all ship files.
>>> >
>>> > Best,
>>> > tison.
>>> >
>>>
>>


?????? ??????savepoint???????????? migration for MapState currently isn't supported.

2019-11-19 Thread claylin
1.9.1??
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.



----
??:"shuwen zhou"https://issues.apache.org/jira/browse/FLINK-11947
 Best,
 Congxian


 claylin <1012539...@qq.comgt; ??2019??11??14?? 9:35??

 gt; 
??savepoint??1.8.1??
 gt; java.lang.RuntimeException: Error while getting 
statenbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
 gt;nbsp; at
 gt;
 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
 gt;nbsp;nbsp;nbsp;nbsp;nbsp; at
 gt;
 
com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
at
 gt;
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 gt; at
 gt;
 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
 gt;nbsp;nbsp; at
 gt;
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
 gt;nbsp;nbsp;nbsp;nbsp; at
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 gt;nbsp; at java.lang.Thread.run(Thread.java:748) Caused by:
 gt; org.apache.flink.util.StateMigrationException: The new serializer 
for
 a
 gt; MapState requires state migration in order for the job to proceed.
 However,
 gt; migration for MapState currently isn't
 supported.nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
 gt;nbsp;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
 gt;nbsp; at
 gt;
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
 gt;nbsp; at
 gt;
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
 gt; at
 gt;
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
 gt;nbsp;nbsp;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
 gt;nbsp; at
 gt;
 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
 gt;nbsp; at
 gt;
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
 gt;nbsp; ... 9 more



-- 
Best Wishes,
Shuwen Zhou 

Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

2019-11-19 Thread shuwen zhou
成功了吗?
我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible 使用的版本的是fink 1.9.0
具体操作是:
trigger savepoint后从savepoint读取就是这个错误
使用的是MapState[String,Void] 在scala代码


... 25 more
 Caused by: java.io.IOException: Failed to open user defined function
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
at
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243)
at tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206)
... 6 more
 Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)

On Fri, 15 Nov 2019 at 10:10, claylin <1012539...@qq.com> wrote:

> 谢谢,我这边确实修改了状态的schema,现在试下看下
>
>
>
>
> --原始邮件--
> 发件人:"Congxian Qiu" 发送时间:2019年11月15日(星期五) 上午10:07
> 收件人:"user-zh"
> 主题:Re: 关于从savepoint启动作业报错 migration for MapState currently isn't
> supported.
>
>
>
> Hi
> 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema
> evolution[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-11947
> Best,
> Congxian
>
>
> claylin <1012539...@qq.com 于2019年11月14日周四 下午9:35写道:
>
>  从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
>  java.lang.RuntimeException: Error while getting state at
> 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
>  at
> 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
>  at
> 
> com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
>  at
> 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at
> 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at
> 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748) Caused by:
>  org.apache.flink.util.StateMigrationException: The new serializer for
> a
>  MapState requires state migration in order for the job to proceed.
> However,
>  migration for MapState currently isn't
> supported. at
> 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
>  at
> 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
>  at
> 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>  at
> 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>  at
> 
> 

Re: YARN : Different cutoff for job and task managers

2019-11-19 Thread Yang Wang
Hi Gwenhael,

I'm afraid that we could not set different cut-off to jobmanager and
taskmanager. You could set
the jvm args manually to work around.
For example, 'env.java.opts.jobmanager=-Xms3072m -Xmx3072m'.
In most jvm implementation, the rightmost Xmx Xms will take effect. So i
think it should work.
Please have a try.


Best,
Yang

Gwenhael Pasquiers  于2019年11月19日周二
下午10:56写道:

> Hello,
>
>
>
> In a setup where we allocate most of the memory to rocksdb (off-heap) we
> ha= ve an important cutoff.
>
>
>
> Our issue is that the same cutoff applies to both task and job managers :
> the heap size of the job manager then becomes too low.
>
>
>
> Is there a way to apply different cutoffs to job and task managers ?
>
>
>
> Regards,
>


Re: How to install Flink + YARN?

2019-11-19 Thread Yang Wang
Hi Pankaj,

First, you need to prepare a hadoop environment separately, including hdfs
and Yarn. If you are familiar
with hadoop, you could download the binary[1] and start the cluster on you
nodes manually. Otherwise,
some tools may help you to deploy a hadoop cluster, ambari[2] and cloudera
manager[2].

Then, download the Flink with Hadoop pre-bundle. You can submit your flink
job now.

Please make sure you set the correct HADOOP_CONF_DIR in your flink client
before starting a flink cluster.



[1].https://hadoop.apache.org/releases.html
[2].
https://www.cloudera.com/products/open-source/apache-hadoop/apache-ambari.html
[3].
https://www.cloudera.com/products/product-components/cloudera-manager.html

Ana  于2019年11月20日周三 上午10:12写道:

> Hi,
>
> I was able to run Flink on YARN by installing YARN and Flink separately.
>
> Thank you.
>
> Ana
>
> On Wed, Nov 20, 2019 at 10:42 AM Pankaj Chand 
> wrote:
>
>> Hello,
>>
>> I want to run Flink on YARN upon a cluster of nodes. From the
>> documentation, I was not able to fully understand how to go about it. Some
>> of the archived answers are a bit old and had pending JIRA issues, so I
>> thought I would ask.
>>
>> Am I first supposed to install YARN separately, and then download the
>> Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put
>> into Flink's /lib folder provide the entire YARN installation?
>>
>> Is there any download that bundles a complete *installation of Fink +
>> installation of YARN*?
>>
>> Thank you,
>>
>> Pankaj
>>
>


Re: How to install Flink + YARN?

2019-11-19 Thread Ana
Hi,

I was able to run Flink on YARN by installing YARN and Flink separately.

Thank you.

Ana

On Wed, Nov 20, 2019 at 10:42 AM Pankaj Chand 
wrote:

> Hello,
>
> I want to run Flink on YARN upon a cluster of nodes. From the
> documentation, I was not able to fully understand how to go about it. Some
> of the archived answers are a bit old and had pending JIRA issues, so I
> thought I would ask.
>
> Am I first supposed to install YARN separately, and then download the
> Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put
> into Flink's /lib folder provide the entire YARN installation?
>
> Is there any download that bundles a complete *installation of Fink +
> installation of YARN*?
>
> Thank you,
>
> Pankaj
>


How to install Flink + YARN?

2019-11-19 Thread Pankaj Chand
Hello,

I want to run Flink on YARN upon a cluster of nodes. From the
documentation, I was not able to fully understand how to go about it. Some
of the archived answers are a bit old and had pending JIRA issues, so I
thought I would ask.

Am I first supposed to install YARN separately, and then download the Flink
file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put into
Flink's /lib folder provide the entire YARN installation?

Is there any download that bundles a complete *installation of Fink +
installation of YARN*?

Thank you,

Pankaj


CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-19 Thread Hailu, Andreas
Hi,

We're in the middle of testing the upgrade of our data processing flows from 
Flink 1.6.4 to 1.9.1. We're seeing that flows which were running just fine on 
1.6.4 now fail on 1.9.1 with the same application resources and input data 
size. It seems that there have been some changes around how the data is sorted 
prior to being fed to the CoGroup operator - this is the error that we 
encounter:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset 
| Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
at 
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

I drilled further down into the YARN app logs, and I found that the container 
was running out of physical memory:

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e42_1574076744505_9444_01_04 because: Container 
[pid=42774,containerID=container_e42_1574076744505_9444_01_04] is running 
beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory 
used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

This is what leads my suspicions as this resourcing configuration worked just 
fine on 1.6.4

I'm working on getting heap dumps of these applications to try and get a better 
understanding of what's causing the blowup in physical memory required myself, 
but it would be helpful if anyone knew what relevant changes have been made 
between these versions or where else I could look? There are some features in 
1.9 that we'd like to use in our flows so getting this sorted out, no pun 
intended, is inhibiting us from doing so.

Best,
Andreas



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread Bowen Li
Great work, glad to see this finally happening!

On Tue, Nov 19, 2019 at 6:26 AM Robert Metzger  wrote:

> Thanks.
>
> I added a ticket for this nice idea:
> https://github.com/ververica/flink-ecosystem/issues/84
>
> On Tue, Nov 19, 2019 at 11:29 AM orips  wrote:
>
>> This is great.
>>
>> Can we have RSS feed for this?
>>
>> Thanks
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: [DISCUSS] Support configure remote flink jar

2019-11-19 Thread Stephan Ewen
Would that be a feature specific to Yarn? (and maybe standalone sessions)

For containerized setups, and init container seems like a nice way to solve
this. Also more flexible, when it comes to supporting authentication
mechanisms for the target storage system, etc.

On Tue, Nov 19, 2019 at 5:29 PM ouywl  wrote:

> I have implemented this feature in our env, Use ‘Init Container’ of
> docker to get URL of a jar file ,It seems a good idea.
>
> ouywl
> ou...@139.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 11/19/2019 12:11,Thomas Weise  wrote:
>
> There is a related use case (not specific to HDFS) that I came across:
>
> It would be nice if the jar upload endpoint could accept the URL of a jar
> file as alternative to the jar file itself. Such URL could point to an
> artifactory or distributed file system.
>
> Thomas
>
>
> On Mon, Nov 18, 2019 at 7:40 PM Yang Wang  wrote:
>
>> Hi tison,
>>
>> Thanks for your starting this discussion.
>> * For user customized flink-dist jar, it is an useful feature. Since it
>> could avoid to upload the flink-dist jar
>> every time. Especially in production environment, it could accelerate the
>> submission process.
>> * For the standard flink-dist jar, FLINK-13938[1] could solve
>> the problem.Upload a official flink release
>> binary to distributed storage(hdfs) first, and then all the submission
>> could benefit from it. Users could
>> also upload the customized flink-dist jar to accelerate their submission.
>>
>> If the flink-dist jar could be specified to a remote path, maybe the user
>> jar have the same situation.
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-13938
>>
>> tison  于2019年11月19日周二 上午11:17写道:
>>
>> > Hi forks,
>> >
>> > Recently, our customers ask for a feature configuring remote flink jar.
>> > I'd like to reach to you guys
>> > to see whether or not it is a general need.
>> >
>> > ATM Flink only supports configures local file as flink jar via `-yj`
>> > option. If we pass a HDFS file
>> > path, due to implementation detail it will fail with
>> > IllegalArgumentException. In the story we support
>> > configure remote flink jar, this limitation is eliminated. We also make
>> > use of YARN locality so that
>> > reducing uploading overhead, instead, asking YARN to localize the jar on
>> > AM container started.
>> >
>> > Besides, it possibly has overlap with FLINK-13938. I'd like to put the
>> > discussion on our
>> > mailing list first.
>> >
>> > Are you looking forward to such a feature?
>> >
>> > @Yang Wang: this feature is different from that we discussed offline, it
>> > only focuses on flink jar, not
>> > all ship files.
>> >
>> > Best,
>> > tison.
>> >
>>
>


Re: [DISCUSS] Support configure remote flink jar

2019-11-19 Thread ouywl







I have implemented this feature in our env, Use ‘Init Container’ of docker to get URL of a jar file ,It seems a good idea.






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 11/19/2019 12:11,Thomas Weise wrote: 


There is a related use case (not specific to HDFS) that I came across:It would be nice if the jar upload endpoint could accept the URL of a jar file as alternative to the jar file itself. Such URL could point to an artifactory or distributed file system.ThomasOn Mon, Nov 18, 2019 at 7:40 PM Yang Wang  wrote:Hi tison,

Thanks for your starting this discussion.
* For user customized flink-dist jar, it is an useful feature. Since it
could avoid to upload the flink-dist jar
every time. Especially in production environment, it could accelerate the
submission process.
* For the standard flink-dist jar, FLINK-13938[1] could solve
the problem.Upload a official flink release
binary to distributed storage(hdfs) first, and then all the submission
could benefit from it. Users could
also upload the customized flink-dist jar to accelerate their submission.

If the flink-dist jar could be specified to a remote path, maybe the user
jar have the same situation.

[1]. https://issues.apache.org/jira/browse/FLINK-13938

tison  于2019年11月19日周二 上午11:17写道:

> Hi forks,
>
> Recently, our customers ask for a feature configuring remote flink jar.
> I'd like to reach to you guys
> to see whether or not it is a general need.
>
> ATM Flink only supports configures local file as flink jar via `-yj`
> option. If we pass a HDFS file
> path, due to implementation detail it will fail with
> IllegalArgumentException. In the story we support
> configure remote flink jar, this limitation is eliminated. We also make
> use of YARN locality so that
> reducing uploading overhead, instead, asking YARN to localize the jar on
> AM container started.
>
> Besides, it possibly has overlap with FLINK-13938. I'd like to put the
> discussion on our
> mailing list first.
>
> Are you looking forward to such a feature?
>
> @Yang Wang: this feature is different from that we discussed offline, it
> only focuses on flink jar, not
> all ship files.
>
> Best,
> tison.
>







YARN : Different cutoff for job and task managers

2019-11-19 Thread Gwenhael Pasquiers
Hello,



In a setup where we allocate most of the memory to rocksdb (off-heap) we ha= ve 
an important cutoff.



Our issue is that the same cutoff applies to both task and job managers : the 
heap size of the job manager then becomes too low.



Is there a way to apply different cutoffs to job and task managers ?



Regards,


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread Robert Metzger
Thanks.

I added a ticket for this nice idea:
https://github.com/ververica/flink-ecosystem/issues/84

On Tue, Nov 19, 2019 at 11:29 AM orips  wrote:

> This is great.
>
> Can we have RSS feed for this?
>
> Thanks
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Local setup, practical differences between standalone JAR vs start-cluster.sh

2019-11-19 Thread Andrea Cardaci
Hi,

I asked the same question on StackOverflow but received no response so
far, so I thought I could also post it here. The original question can
be found at: https://stackoverflow.com/q/58922246/477168 Feel free to
(also) reply there.

For convenience find the original text below.

---

So the code is the same in both approaches and it's roughly the following:

```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// prepare the topology...
env.execute();
```

The scenario is Flink running *locally* on a single machine.

## Standalone JAR

`pom.xml` (relevant bits):

```xml

org.apache.flink
flink-core
1.9.0


org.apache.flink
flink-streaming-java_2.12
1.9.0

```

Run with:

```console
java -cp target/My-0.0.0.jar MainClass
```

## `start-cluster.sh`

`pom.xml` (relevant bits):

```xml

org.apache.flink
flink-core
1.9.0
provided


org.apache.flink
flink-streaming-java_2.12
1.9.0
provided

```

Run with:

```console
/path/to/flink-1.9.1/bin/flink run -c MainClass target/My-0.0.0.jar
```

---

[This][1] documentation page states:

> The LocalExecutionEnvironment is starting the full Flink runtime, including a 
> JobManager and a TaskManager. These include memory management and all the 
> internal algorithms that are executed in the cluster mode.

And makes me think that there is no practical differences between the
two, but I can't be sure...

Is there anything else I need to consider? Would there be any
differences in terms of performance?

[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html


Best,
Andrea


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread orips
This is great.

Can we have RSS feed for this?

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Re: When using udaf, the startup job has a “Cannot determine simple type name 'com' ” exception(Flink version 1.7.2)

2019-11-19 Thread Robert Metzger
Thanks for your message.

It would be great if you could provide code to reproduce the issue (it does
not have to be your exact code, a simplified example is also fine).
Maybe your program is not directly causing the issue, but it seems that the
code generator is producing something we can not compile.






On Thu, Nov 7, 2019 at 8:49 AM mailtolrl  wrote:

> Hi, vino:
> Thanks for your answer. When I set 20 parallelism, I can run job
> succeed every time. When setting 40 parallelism, sometimes I can submit it
> successfully, sometimes throw the exception. When setting 60 parallelism at
> the time, it has not been submitted successfully, always this exception.So
> I don't think it has anything to do with my program.
>
>
>
>
> 在 2019-11-07 15:20:13,"vino yang"  写道:
>
> Hi mailtolrl,
>
> Can you share more context about your program and UDAF.
>
> Best,
> Vino
>
> mailtolrl  于2019年11月7日周四 下午3:05写道:
>
>> My flink streaming job use a udaf, set 60 parallelisms,submit job in yarn
>> cluster mode,and then happens every time I start.
>>
>>
>>
>>
>
>
>
>


Re: Flink configuration at runtime

2019-11-19 Thread Robert Metzger
Hi Amran,
thanks a lot for your message.

I think this is a valid feature request. I've created a JIRA ticket to
track it: https://issues.apache.org/jira/browse/FLINK-14856 (this does not
mean this gets addressed immediately. However, there are currently quite
some improvements to the configuration system in Flink, as part of FLIP-59
and FLIP-81)

Best,
Robert



On Tue, Nov 19, 2019 at 4:09 AM vino yang  wrote:

> Hi Amran,
>
> Change the config option at runtime? No, Flink does not support this
> feature currently.
>
> However, for Flink on Yarn job cluster mode, you can specify different
> config options for different jobs via program or flink-conf.yaml(copy a new
> flink binary package then change config file).
>
> Best,
> Vino
>
> amran dean  于2019年11月19日周二 上午5:53写道:
>
>> Is it possible to configure certain settings at runtime, on a per-job
>> basis rather than globally within flink-conf.yaml?
>>
>> For example, I have a job where it's desirable to retain a large number
>> of checkpoints via
>> state.checkpoints.num-retained.
>>
>> The checkpoints are cheap, and it's low cost. For other jobs, I don't
>> want such a large number.
>>
>>
>>


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread vino yang
Hi Robert,

Just added it under the "Tools" category[1].

[1]: https://flink-packages.org/packages/kylin-flink-cube-engine

Best,
Vino

Robert Metzger  于2019年11月19日周二 下午4:33写道:

> Thanks.
> You can add Kylin whenever you think it is ready.
>
> On Tue, Nov 19, 2019 at 9:07 AM vino yang  wrote:
>
> > Thanks Robert. Great job! The web site looks great.
> >
> > In the future, we can also add my Kylin Flink cube engine[1] to the
> > ecosystem projects list.
> >
> > [1]: https://github.com/apache/kylin/tree/engine-flink
> >
> > Best,
> > Vino
> >
> > Oytun Tez  于2019年11月19日周二 上午12:09写道:
> >
> >> Congratulations! This is exciting.
> >>
> >>
> >>  --
> >>
> >> [image: MotaWord]
> >> Oytun Tez
> >> M O T A W O R D | CTO & Co-Founder
> >> oy...@motaword.com
> >>
> >>   
> >>
> >>
> >> On Mon, Nov 18, 2019 at 11:07 AM Robert Metzger 
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> I would like to announce that Ververica, with the permission of the
> >>> Flink PMC, is launching a website called flink-packages.org. This goes
> >>> back to an effort proposed earlier in 2019 [1]
> >>> The idea of the site is to help developers building extensions /
> >>> connectors / API etc. for Flink to get attention for their project.
> >>> At the same time, we want to help Flink users to find those ecosystem
> >>> projects, so that they can benefit from the work. A voting and
> commenting
> >>> functionality allows users to rate and and discuss about individual
> >>> packages.
> >>>
> >>> You can find the website here: https://flink-packages.org/
> >>>
> >>> The full announcement is available here:
> >>> https://www.ververica.com/blog/announcing-flink-community-packages
> >>>
> >>> I'm happy to hear any feedback about the site.
> >>>
> >>> Best,
> >>> Robert
> >>>
> >>>
> >>> [1]
> >>>
> https://lists.apache.org/thread.html/c306b8b6d5d2ca071071b634d647f47769760e1e91cd758f52a62c93@%3Cdev.flink.apache.org%3E
> >>>
> >>
>


Elastic search sink error handling

2019-11-19 Thread Nicholas Walton
HI,

I need help with handling errors with the elasticsearch sink as below

2019-11-19 08:09:09,043 ERROR 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase  - 
Failed Elasticsearch item request: 
[flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]]
 ElasticsearchException[Elasticsearch exception 
[type=version_conflict_engine_exception, reason=[75:108]: version conflict, 
document already exists (current version [1])]]
[flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]]
 ElasticsearchException[Elasticsearch exception 
[type=version_conflict_engine_exception, reason=[75:108]: version conflict, 
document already exists (current version [1])]]
at 
org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
at 
org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
at 
org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
at 
org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
at 
org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
at 
org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
at 
org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
at 
org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
at 
org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at 
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at 
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)

The error is expected since I  am creating documents with duplicate ids, so I 
can only load new data from a previous batch that was only partially loaded or 
due to a timeout I’ve uploaded a document twice to ensure the document is 
definitely loaded and not lost in the timeout. 

The document is created as

  val json = new util.HashMap[String, Any]
  json.put("arrayinstance", esid)
  json.put("bearing", element._1)
  json.put("sampleindex", element._2)
  json.put("sample", element._3)
  json.put("hashstring", element._4)
  json.put("priorrepeats", element._5)
  return Requests.indexRequest()
.index("flink-index-deduplicated")
.`type`("_doc")
.id(element._1+":"+element._2)
.create(true)
.source(json)
}

My problem is how can I catch the failure, recover and carryon? I have set a 
failure handler as below which will need extending to handle the failure above

esSinkBuilder.setFailureHandler(
  new ActionRequestFailureHandler() {
@throws(classOf[Throwable])
@Override
override def onFailure(action: ActionRequest, failure: Throwable, 
restStatusCode: Int, indexer: RequestIndexer) {

  if (ExceptionUtils.findThrowable(failure, 
classOf[EsRejectedExecutionException]).isPresent) {
Job.LOG.info("ElasticSearch full queue; re-added document for 
indexing")
indexer.add(action)
  } else if (ExceptionUtils.findThrowable(failure, 
classOf[ElasticsearchParseException]).isPresent) {
LOG.info("Malformed ElasticSearch document. Document dropped")
  } else if 

Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread Robert Metzger
Thanks.
You can add Kylin whenever you think it is ready.

On Tue, Nov 19, 2019 at 9:07 AM vino yang  wrote:

> Thanks Robert. Great job! The web site looks great.
>
> In the future, we can also add my Kylin Flink cube engine[1] to the
> ecosystem projects list.
>
> [1]: https://github.com/apache/kylin/tree/engine-flink
>
> Best,
> Vino
>
> Oytun Tez  于2019年11月19日周二 上午12:09写道:
>
>> Congratulations! This is exciting.
>>
>>
>>  --
>>
>> [image: MotaWord]
>> Oytun Tez
>> M O T A W O R D | CTO & Co-Founder
>> oy...@motaword.com
>>
>>   
>>
>>
>> On Mon, Nov 18, 2019 at 11:07 AM Robert Metzger 
>> wrote:
>>
>>> Hi all,
>>>
>>> I would like to announce that Ververica, with the permission of the
>>> Flink PMC, is launching a website called flink-packages.org. This goes
>>> back to an effort proposed earlier in 2019 [1]
>>> The idea of the site is to help developers building extensions /
>>> connectors / API etc. for Flink to get attention for their project.
>>> At the same time, we want to help Flink users to find those ecosystem
>>> projects, so that they can benefit from the work. A voting and commenting
>>> functionality allows users to rate and and discuss about individual
>>> packages.
>>>
>>> You can find the website here: https://flink-packages.org/
>>>
>>> The full announcement is available here:
>>> https://www.ververica.com/blog/announcing-flink-community-packages
>>>
>>> I'm happy to hear any feedback about the site.
>>>
>>> Best,
>>> Robert
>>>
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/c306b8b6d5d2ca071071b634d647f47769760e1e91cd758f52a62c93@%3Cdev.flink.apache.org%3E
>>>
>>


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread vino yang
Thanks Robert. Great job! The web site looks great.

In the future, we can also add my Kylin Flink cube engine[1] to the
ecosystem projects list.

[1]: https://github.com/apache/kylin/tree/engine-flink

Best,
Vino

Oytun Tez  于2019年11月19日周二 上午12:09写道:

> Congratulations! This is exciting.
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>
>
> On Mon, Nov 18, 2019 at 11:07 AM Robert Metzger 
> wrote:
>
>> Hi all,
>>
>> I would like to announce that Ververica, with the permission of the Flink
>> PMC, is launching a website called flink-packages.org. This goes back to
>> an effort proposed earlier in 2019 [1]
>> The idea of the site is to help developers building extensions /
>> connectors / API etc. for Flink to get attention for their project.
>> At the same time, we want to help Flink users to find those ecosystem
>> projects, so that they can benefit from the work. A voting and commenting
>> functionality allows users to rate and and discuss about individual
>> packages.
>>
>> You can find the website here: https://flink-packages.org/
>>
>> The full announcement is available here:
>> https://www.ververica.com/blog/announcing-flink-community-packages
>>
>> I'm happy to hear any feedback about the site.
>>
>> Best,
>> Robert
>>
>>
>> [1]
>> https://lists.apache.org/thread.html/c306b8b6d5d2ca071071b634d647f47769760e1e91cd758f52a62c93@%3Cdev.flink.apache.org%3E
>>
>