Re: Autoscaling vs backpressure

2020-05-07 Thread Arvid Heise
Hi Manish, while you could use backpressure and the resulting consumer lag to throttle the source and keep processing lag to a minimum, I'd personally see only very limited value. It assumes that you have an architecture where you can influence the input rate, which is probably only true if you ge

async IO in UDFs

2020-05-07 Thread lec ssmi
Hi: Is there any way to implements async IO in UDFs (scalar function, table function, aggregate function)?

Re: async IO in UDFs

2020-05-07 Thread Benchao Li
Hi, AFAIK, there is no way to do this for now. This needs the operators running UDFs to support async IO. lec ssmi 于2020年5月7日周四 下午3:23写道: > Hi: > Is there any way to implements async IO in UDFs (scalar function, > table function, aggregate function)? > -- Benchao Li School of Electron

Re: Statefun 2.0 questions

2020-05-07 Thread Igal Shilman
Hi Wouter! Glad to read that you are using Flink for quite some time, and also exploring with StateFun! 1) yes it is correct and you can follow the Dockerhub contribution PR at [1] 2) I’m not sure I understand what do you mean by trigger from the browser. If you mean, for testing / illustration

flink how to access remote hdfs using namenode nameservice

2020-05-07 Thread wangl...@geekplus.com.cn
According to https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html I am deploying standalone cluster with jobmanager HA and need the hdfs address: high-availability.storageDir: hdfs:///flink/recovery My hadoop is a remote cluster. I can write it a

Re: checkpointing opening too many file

2020-05-07 Thread David Anderson
With the FsStateBackend you could also try increasing the value of state.backend.fs.memory-threshold [1]. Only those state chunks that are larger than this value are stored in separate files; smaller chunks go into the checkpoint metadata file. The default is 1KB, increasing this should reduce file

Re: Window processing in Stateful Functions

2020-05-07 Thread Igal Shilman
Hi all, Data stream windows are not yet supported in statefun, but it seems like the main motivation here is to purge old edges? If this is the case perhaps we need to integrate state TTL [1] into persisted values/persistedtables. An alternative approach would be to implement a thumbling window p

Re: Statefun 2.0 questions

2020-05-07 Thread Wouter Zorgdrager
Hi Igal, Thanks for your quick reply. Getting back to point 2, I was wondering if you could trigger indeed a stateful function directly from Flask and also get the reply there instead of using Kafka in between. We want to experiment running stateful functions behind a front-end (which should be ab

Re: Rich Function Thread Safety

2020-05-07 Thread tao xiao
As the java doc suggests it seems operator method and snapshot checkpoint are accessed by two different threads https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java#L39-L62 On Thu, May 7, 2020 at

Correctly implementing of SourceFunction.run()

2020-05-07 Thread Senthil Kumar
I am implementing a source function which periodically wakes up and consumes data from S3. My currently implementation is like so. Following: org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction Is it safe to simply swallow any and all exceptions in the run method

Re: Correctly implementing of SourceFunction.run()

2020-05-07 Thread Jingsong Li
Hi, Some suggestions from my side: - synchronized (checkpointLock) to some work and ctx.collect? - Put Thread.sleep(interval) out of try catch? Maybe should not swallow interrupt exception (Like cancel the job). Best, Jingsong Lee On Fri, May 8, 2020 at 2:52 AM Senthil Kumar wrote: > I am impl

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-07 Thread Congxian Qiu
Hi Yes, there should only files used in checkpoint 8 and 9 and 10 in the checkpoint file, but you can not delete the file which created older than 3 minutes(because checkpoint 8,9, 10 may reuse the file created in the previous checkpoint, this is the how incremental checkpoint works[1]) you can a

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-07 Thread Trystan
Aha, so incremental checkpointing *does* rely on infinitely-previous checkpoint state, regardless of the incremental retention number. The documentation wasn't entirely clear about this. One would assume that if you retain 3 checkpoints, anything older than the 3rd is irrelevant, but that's evident

What's the best practice to determine whether a job has finished or not?

2020-05-07 Thread Caizhi Weng
Hi dear Flink community, I would like to determine whether a job has finished (no matter successfully or exceptionally) in my code. I used to think that JobClient#getJobStatus is a good idea, but I found that it behaves quite differently under different executing environments. For example, under

Re: flink how to access remote hdfs using namenode nameservice

2020-05-07 Thread Yang Wang
Do you mean to use the hdfs nameservice? You could find it with config key "dfs.nameservices" in hdfs-site.xml. For example, hdfs://myhdfs/flink/recovery. Please keep in mind that you need to set the HADOOP_CONF_DIR environment beforehand. Best, Yang wangl...@geekplus.com.cn 于2020年5月7日周四 下午5:0