回复: flink 缓存本地文件被删除疑问

2019-10-10 Thread 戴嘉诚
你好,我的任务是用RocksDB存储的Checkpoint, 是运行了一段时间后报的这个错误 发件人: pengchenglin 发送时间: 2019年10月11日 11:59 收件人: user-zh@flink.apache.org 主题: Re: flink 缓存本地文件被删除疑问 你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》 发件人: 戴嘉诚 发送时间: 2019-10-11 11:00 收件人: user-zh@flink.apache.org 主题: flink 缓存本地文件被删除疑问 大家好:

Re: Where are uploaded Job jars stored?

2019-10-10 Thread Steven Nelson
John, I think you are referring to the web upload directory. There is a setting for that folder ‘web.upload.dir’. If you set that to a folder writeable to both masters it will work as desired. I used an NFS mount (AWS EFS). -Steve Sent from my iPhone > On Oct 10, 2019, at 10:11 PM, Zhu Zhu

Re: Fencing token exceptions from Job Manager High Availability mode

2019-10-10 Thread Joshua Fan
Sorry to forget the version, we run flink 1.7 on yarn in a ha mode. On Fri, Oct 11, 2019 at 12:02 PM Joshua Fan wrote: > Hi Till > > After got your advice, I checked the log again. It seems not wholely the > same as the condition you mentioned. > > I would like to summarize the story in the

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yun Tang
Hi Hao It seems that I misunderstood the background of usage for your cases. High availability configuration targets for fault tolerance not for general development evolution. If you want to change your job topology, just follow the general rule to restore from savepoint/checkpoint, do not

Re: Batch Job in a Flink 1.9 Standalone Cluster

2019-10-10 Thread Xintong Song
I think it depends on your configurations. - Are you using on-heap/off-heap managed memory? (configured by 'taskmanager.memory.off-heap', by default is false) - Is managed memory pre-allocated? (configured by 'taskmanager.memory.preallocate', by default is ffalse) If managed memory is

Re: Fencing token exceptions from Job Manager High Availability mode

2019-10-10 Thread Joshua Fan
Hi Till After got your advice, I checked the log again. It seems not wholely the same as the condition you mentioned. I would like to summarize the story in the belowed log. Once a time, the zk connection was not stable, so there happened 3 times suspended-reconnected. After the first

Re: flink 缓存本地文件被删除疑问

2019-10-10 Thread pengchenglin
你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》 发件人: 戴嘉诚 发送时间: 2019-10-11 11:00 收件人: user-zh@flink.apache.org 主题: flink 缓存本地文件被删除疑问 大家好: 最近我的程序迁移到了flink1.9 on yarn session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为: java.lang.Exception: Exception while creating

Re: Batch Job in a Flink 1.9 Standalone Cluster

2019-10-10 Thread Yang Wang
Hi Tim, Do you mean the user heap memory used by the tasks of finished jobs is not freed up? If this is the case, the memory usage of taskmanger will increase as more and more jobs finished. However this does not happen, the memory will be freed up by jvm gc. BTW, flink has its own memory

Re: Where are uploaded Job jars stored?

2019-10-10 Thread Zhu Zhu
Hi John, Not sure why you need to know the location of uploaded job jars? The job jar will be automatically localized to a taskmanager via BlobService when a task belonging to the job is running on that taskmanager. The localization dir is blob.storage.directory. If it is not specified, it will

Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Yun Tang
Any checkpoint could only completed if your job not failed. Since checkpoint barrier is injected with messages together, if the problematic message would cause your job to fail. You cannot complete any checkpoint after that problematic message processed. In other words, you could always resume

Re: Flink 1.8.0 S3 Checkpoints fail with java.security.ProviderException: Could not initialize NSS

2019-10-10 Thread Hao Sun
I saw similar issue when using alpine linux. https://pkgs.alpinelinux.org/package/v3.3/main/x86/nss Installing this package fixed my problem Hao Sun On Thu, Oct 10, 2019 at 3:46 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hi there, > > I'm getting the following error message

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Hao Sun
Yep I know that option. That's where get me confused as well. In a HA setup, where do I supply this option (allowNonRestoredState)? This option requires a savepoint path when I start a flink job I remember. And HA does not require the path Hao Sun On Thu, Oct 10, 2019 at 11:16 AM Yun Tang

Flink 1.8.0 S3 Checkpoints fail with java.security.ProviderException: Could not initialize NSS

2019-10-10 Thread Austin Cawley-Edwards
Hi there, I'm getting the following error message on a Flink 1.8 cluster deployed on Kubernetes. I've already confirmed that the pod has access to S3 and write permissions to the bucket, but I can't understand what the SSL issue is and if it is related to S3 or not. I have tried both with the

Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Flavio Pompermaier
Sorry for the dumb question but let's suppose to not use retained checkpoint and my job processed billions of messages from Kafka. Then a problematic message causes my job to fail..am I able to complete a savepoint to fic the job and restart from the problematic message (i.e. last "valid" kafka

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-10 Thread Filip Niksic
Hi Theo, Isn't the solution I proposed exactly the solution you talk about? Read the stream sequentially, assign punctuated watermarks, keyBy to achieve parallelism. Perhaps you're reading too much into my question. When I sent the first email, I didn't even know about punctuated watermarks.

Re: Async and checkpointing

2019-10-10 Thread anurag
Hi Yun, Thanks for your reply. I am sorry if I was not clear . What I meant was that as records are processed, the checkpoint and the corresponding stream position will move. But in case of async with unordered, will the checkpoint and the corresponding stream position move in the above scenario

Re: Where are uploaded Job jars stored?

2019-10-10 Thread John Smith
And can that folder be shared so that all nodes see it? On Thu, 10 Oct 2019 at 14:36, Yun Tang wrote: > Hi John > > The jar is not stored in HA path, I think the answer [1] could help you. > > [1] >

Re: Where are uploaded Job jars stored?

2019-10-10 Thread Yun Tang
Hi John The jar is not stored in HA path, I think the answer [1] could help you. [1] https://stackoverflow.com/questions/51936608/where-can-i-find-my-jar-on-apache-flink-server-which-i-submitted-using-apache-fl Best Yun Tang From: John Smith Sent: Friday,

Re: Best coding practises guide while programming using flink apis

2019-10-10 Thread Yun Tang
Hi Terry Flink has a code-style and quality guide when contributes code[1], this might not be directly what you want but hope could help a bit. As more and more big data system recommend high level and declarative API such as SQL and Table API [2], I think GOF design patterns might not play an

Re: Async and checkpointing

2019-10-10 Thread Yun Tang
Hi Anurag What do you mean "will the checkpoint pointer move at all or not"? If one of your thread failed to send record, and if it would cause that sub-task to fail, it would lead to the job failover. When job failover, any on-going checkpoint would be aborted and job would then just restore

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yun Tang
Just a minor supplement @Hao Sun, if you decided to drop a operator, don't forget to add --allowNonRestoredState (short: -n) option [1] [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state Best Yun Tang

Where are uploaded Job jars stored?

2019-10-10 Thread John Smith
Hi using 1.8.0 running on standalone cluster with Zookeeper HA. Are job JARs stored at: high-availability.storageDir ??? The thing is when you browse the individual nodes at port 8080 to go submit the job only the node where you uploaded the JAR has it. - Go to any given node - Upload a jar -

Async and checkpointing

2019-10-10 Thread anurag
Hi All, Thanks for your help in advance. I am using async I/O with flink 1.5.5, I am using AsyncDataStream.unorderedWait method also my capacity is set to 100.My question is since my capacity is 100, each thread will be processing one record.Say sequence number of my records is S1,S2S100

Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Yun Tang
Hi Vishwas Image this scenario, if your last committed offset is A with a savepoint-A and then you just stop this job and try a new program logical such as print your output instead of writing to previous sink to do some experiments. The new experimental job might commit offset-B to kafka.

Re: Re:Memory constrains running Flink on Kubernetes

2019-10-10 Thread Yun Tang
Hi Shengjk1 setBlockCacheSize, setWriteBufferSize and setMaxWriteBufferNumber could help you to control memory usage. However, Flink would store state per column family which would increase the number of column family and each family has its own write buffer. FRocksDB [1] already plan to fix

Re: Difference between windows in Spark and Flink

2019-10-10 Thread Yun Tang
Hi Felipe Generally speaking, the key difference which impacts the performance is where they store data within windows. For Flink, it would store data and its related time-stamp within windows in state backend[1]. Once window is triggered, it would pull all the stored timer with coupled

Re: Backpressure tuning/failure

2019-10-10 Thread Owen Rees-Hayward
Hey Piotr, I think we are broadly in agreement, hopefully. So out of the three scenarios you describe, the code is simulating scenario 2). The only additional comment I would make to this is that the additional load on a node could be an independent service or job. I am guessing we can agree,

Re: Backpressure tuning/failure

2019-10-10 Thread Piotr Nowojski
Hi Owen, Thanks for the quick response. No, I haven’t seen the previous blog post, yes it clears the things out a bit. > To clarify, the code is attempting to simulate a straggler node due to high > load, which therefore processes data at a slower rate - not a failing node. > Some degree of

Re: Backpressure tuning/failure

2019-10-10 Thread Owen Rees-Hayward
Hi Piotr, Thanks for getting back to me and for the info. I try to describe the motivation around the scenarios in the original post in the series - see the 'Backpressure - why you might care' section on http://owenrh.me.uk/blog/2019/09/30/. Maybe it could have been clearer. As you note, this

Re: Flink集群迁移savepoint还保留原集群地址问题讨论

2019-10-10 Thread Congxian Qiu
你好 现在 Savepoint 中保存的是文件的绝对路径,暂时不支持 savepoint 的的移动,如果真的需要迁移你可以自己写一个程序将 savepoint 中 meta 的地址进行转换,然后重新生成一个 meta 文件。 Best, Congxian pengchengl...@163.com 于2019年10月10日周四 下午5:31写道: > 你好,生成savepoint之后,是否把savepoint移动到了其他地方?官方说不能移动。 > > > 发件人: 蒋涛涛 > 发送时间: 2019-10-10 17:13 > 收件人: user-zh > 主题:

Difference between windows in Spark and Flink

2019-10-10 Thread Felipe Gutierrez
Hi all, I am trying to think about the essential differences between operators in Flink and Spark. Especially when I am using Keyed Windows then a reduce operation. In Flink we develop an application that can logically separate these two operators. Thus after a keyed window I can use

Re:Memory constrains running Flink on Kubernetes

2019-10-10 Thread shengjk1
+1 I also encountered a similar problem, but I run flink application that uses state in RocksDB on yarn. Yarn container was killed because OOM. I also saw rockdb tuning guide[1], tune some parameters,but it is useless , such as: class MyOptions1 implements OptionsFactory { @Override public

Re: Passing parameters to filter function (in DataStreams)

2019-10-10 Thread Theo Diefenthal
Hi, Your original post looks like "computeThreshold" doesn't require any parameters, but is just an expensive to compute operation. In that case, you can inherit from "RichFilterFunction" instead of "FilterFunction". In case of "RichFilterFunction", you can override the "open"-method and

Batch Job in a Flink 1.9 Standalone Cluster

2019-10-10 Thread Timothy Victor
After a batch job finishes in a flink standalone cluster, I notice that the memory isn't freed up. I understand Flink uses it's own memory manager and just allocates a large tenured byte array that is not GC'ed. But does the memory used in this byte array get released when the batch job is

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-10 Thread Theo Diefenthal
Hi Filip, My point was not about the computation of the "maximum". My point was: You could hopefully read the stream sequentially and just assign punctuated watermarks to it. Once you have assigned the watermarks properly (And before you do your expensive computatation, like in this case

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Vijay Bhaskar
Thanks Yang. We will try and let you know if any issues arise Regards Bhaskar On Thu, Oct 10, 2019 at 1:53 PM Yang Wang wrote: > @ Hao Sun, > I have made a confirmation that even we change parallelism and/or modify > operators, add new operators, > the flink cluster could also recover from

Re: Flink metrics reporters documentation

2019-10-10 Thread Aleksey Pak
Hi Flavio, Below is my explanation to your question, based on anecdotal evidence: As you may know, Flink distribution package is already scala version specific and bundles some jar artifacts. User Flink job is supposed to be compiled against some of those jars (with maven's `provided` scope).

Re: Flink集群迁移savepoint还保留原集群地址问题讨论

2019-10-10 Thread pengchengl...@163.com
你好,生成savepoint之后,是否把savepoint移动到了其他地方?官方说不能移动。 发件人: 蒋涛涛 发送时间: 2019-10-10 17:13 收件人: user-zh 主题: Flink集群迁移savepoint还保留原集群地址问题讨论 HI all, 本来在Flink集群迁移过程中,使用版本flink 1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下: java.lang.Exception: Exception while creating

Re: Flink集群迁移savepoint还保留原集群地址问题讨论

2019-10-10 Thread Qi Kang
Hi, 这是因为savepoint的_metadata文件中存储的是savepoint文件的绝对路径,而非相对路径,所以恢复的时候仍然会找旧的集群。 从现有资料看,savepoint仍然无法迁移。详情可以参考 [1] 和 [2]。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#triggering-savepoints [2] https://issues.apache.org/jira/browse/FLINK-5763 BR, Qi Kang

Re: Backpressure tuning/failure

2019-10-10 Thread Piotr Nowojski
Hi, I’m not entirely sure what you are testing. I have looked at your code (only the constant straggler scenario) and please correct me if’m wrong, in your job you are basically measuring throughput of `Thread.sleep(straggler.waitMillis)`. In the first RichMap task (`subTaskId == 0`), per

Flink集群迁移savepoint还保留原集群地址问题讨论

2019-10-10 Thread 蒋涛涛
HI all, 本来在Flink集群迁移过程中,使用版本flink 1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下: java.lang.Exception: Exception while creating StreamOperatorStateContext. at

Re: Flink metrics reporters documentation

2019-10-10 Thread Flavio Pompermaier
Sorry, I just discovered that those jars are actually in the opt folder within Flink dist..however the second point still holds: why there's a single influxdb jar inside flink's opt jar while on maven there are 2 versions (one for scala 2.11 and one for 2.12)? Best, Flavio On Thu, Oct 10, 2019

Flink metrics reporters documentation

2019-10-10 Thread Flavio Pompermaier
Hi to all, I was trying to configure monitoring on my cluster so I went to the metric reporters documentation. There are 2 things that are not clear to me: 1. In all reporters the documentation says to take the jars from /opt folder..obviously this is not true. Wouldn't be better to provide

回复: flink1.9 webui exception日志显示问题

2019-10-10 Thread 戴嘉诚
+1 这个我也遇到了这个问题,主要原因是异常了,然后region 重启,重启后,会重新加载,就自动清空了异常日志信息..现在不能再webui上排查异常信息了 发件人: 李杰 发送时间: 2019年10月10日 14:41 收件人: user-zh@flink.apache.org 主题: flink1.9 webui exception日志显示问题 log4j.properties为官方默认。 weib ui exception日志一闪而过,ui上看不到历史异常信息

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yang Wang
@ Hao Sun, I have made a confirmation that even we change parallelism and/or modify operators, add new operators, the flink cluster could also recover from latest checkpoint. @ Vijay a) Some individual jobmanager/taskmanager crashed exceptionally(someother jobmanagers and taskmanagers are alive),

flink checkpoint超时问题

2019-10-10 Thread lg
各位好: 我在使用flink的过程中遇到了下面的问题,刚开始接触flink,对排查问题的思路不是很清晰,麻烦大家提供下思路哈,谢谢。 应用场景:我这里使用Standalone Cluster方式搭建了一flink集群,其中Task Managers=10,Task Slots=54。 flink的stream过程大致为: kafka topic1的数据 --> 异步调用外部资源对数据填充 --> 存入kafka topic2。

Re: State & Fault Tolerance in Table API and SQL Implementations

2019-10-10 Thread Dawid Wysakowicz
Hi Vaibhav, I am not sure if there are specific documentation parts about state handling in Table API. There are just a few important facts that you must be aware of: * in a failover scenario, everything should work just fine, internally Table API uses Flink's state and all intermediate results

flink1.9 webui exception日志显示问题

2019-10-10 Thread 李杰
log4j.properties为官方默认。 weib ui exception日志一闪而过,ui上看不到历史异常信息 [image: 1.jpg] [image: 2.jpg]

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Vijay Bhaskar
Thanks Yang and Sean. I have couple of questions: 1) Suppose the scenario of , bringing back entire cluster, a) In that case, at least one job manager out of HA group should be up and running right? or b) All the job managers fails, then also this works? In that case please let me know