Re: Status of FLINK-12692 (Support disk spilling in HeapKeyedStateBackend)

2020-01-30 Thread Yu Li
btw, the upstreaming work is lagged but not stalled, and the biggest PR (with more than 6k lines of codes) [1] [2] was already merged on Nov. 2019. We are sorry about the lagging, but we're still actively working on this feature, JFYI (smile). Best Regards, Yu [1]

Re: Status of FLINK-12692 (Support disk spilling in HeapKeyedStateBackend)

2020-01-30 Thread Yu Li
Hi Ken, Thanks for watching this feature. Unfortunately yes this didn't make into 1.10, and we will try our best to complete the upstreaming work in 1.11.0. Meantime, we are actively preparing a try-out version and will publish it onto flink-packages [1] once ready. Will also get back here once

Re: Difference between JobManager and JobMaster

2020-01-30 Thread Yun Tang
Hi Wizheng FLIP-6 [1] introduce a new implementation of master component in Flink cluster with name of `JobMaster.java`[2], you can find previous implementation in Flink-1.7 as `JobManager.scala` [3]. However, we would still call the master component as 'JobManager' though its implementation

Re: Sorting bounded data stream

2020-01-30 Thread Jingsong Li
Hi, If you are using checkpoints, I think a simple way is using a ListState to store all coming records. And in endInput(), drain all records from ListState to a sorter to sort all records. Best, Jingsong Lee On Fri, Jan 31, 2020 at 3:10 PM Łukasz Jędrzejewski wrote: > Hi, > > Thank you very

Re: Sorting bounded data stream

2020-01-30 Thread Łukasz Jędrzejewski
Hi, Thank you very much for suggestions. I will check out the UnilateralSortMerge. However in our case we are using checkpoints. Kind regards, Łukasz W dniu 31.01.2020 o 07:54, Jingsong Li pisze: Hi Łukasz, First, we are planning to design and implement the BoundedStream story, which will

Re: Sorting bounded data stream

2020-01-30 Thread Jingsong Li
Hi Łukasz, First, we are planning to design and implement the BoundedStream story, which will be discussed further in 1.11 or 1.12. SortedMapState was discussed in FLINK-6219 [1], But there are some problems that can not be solved well, so they have not been introduced. If it is a pure

Re: Dataset是否能以index提取內部元素

2020-01-30 Thread 月月
謝謝解答! 我試一下 Yun Gao 於 2020年1月29日 週三 上午11:40寫道: > > > 另外,DataSet本身應該沒有按index來提取特定元素的API,如果想要實現相應的功能,感覺應該需要首先給DataSet中各個元素增加index,將其變為Tuple2 Edge>,然後在後續的map操作中根據index來執行不同操作~ > > > -- > From:Yun Gao > Send Time:2020 Jan. 29 (Wed.)

Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-30 Thread Tzu-Li (Gordon) Tai
Update: I can confirm my previous guess based on the changes in https://issues.apache.org/jira/browse/FLINK-4280 that was merged for Flink 1.3.0. When upgrading from Flink 1.2.x -> 1.3.0, the new startup position configurations were respected over the checkpointed offsets (only once for the first

Difference between JobManager and JobMaster

2020-01-30 Thread Lu Weizheng
Hi all, Recently I am reading source code of Flink. There are both JobManager and JobMaster in flink-runtime project. And JobManagerRunner interface says it is a runner which executes a JobMaster. So how to distinguish the two concepts? Is JobMaster a subset of JobManager? Or JobMaster is a

Re: Task-manager kubernetes pods take a long time to terminate

2020-01-30 Thread Yang Wang
I think if you want to delete your Flink cluster on K8s, then you need to directly delete all the created deployments(jobmanager deploy, taskmanager deploy). For the configmap and service, you could leave them there if you want to reuse them by the next Flink cluster deploy. What's the status of

Issue with committing Kafka offsets

2020-01-30 Thread RKandoji
Hi Team, I'm running into strange issue pasted below: Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity. I read data from more

Re: Task-manager kubernetes pods take a long time to terminate

2020-01-30 Thread Li Peng
Hi Yun, I'm currently specifying that specific RPC address in my kubernetes charts for conveniene, should I be generating a new one for every deployment? And yes, I am deleting the pods using those commands, I'm just noticing that the task-manager termination process is short circuited by the

Re: Flink+YARN HDFS replication factor

2020-01-30 Thread Piper Piper
Please disregard my previous email. I found the answer online. I thought writing data to local disk automatically meant the data would be persisted to HDFS. However, Spark writes data (in between shuffles) to local disk only. Thanks On Thu, Jan 30, 2020, 2:00 PM Piper Piper wrote: > Hi Till,

Re: Updating multiple database tables

2020-01-30 Thread Jason Sommer
Hi Dylan, I have a similar use case of saving updates to multiple RDBMS tables. While I'm leaning towards using multiple JDBCOutputFormats to solve the issue, I'm curious about which approach you ended up using. Thanks, Jason On 2018/11/28 21:09:08, Dylan Adams wrote: > Hello,> > > I was

Re: Flink+YARN HDFS replication factor

2020-01-30 Thread Piper Piper
Hi Till, Thank you for the information! In case of wide transformations, Spark stores input data onto disk between shuffles. So, I was wondering if Flink does that as well (even for windows of streaming data), and whether that "storing to disk" is persisted to the HDFS and honors the replication

Sorting bounded data stream

2020-01-30 Thread Łukasz Jędrzejewski
Hi all, In Flink 1.9 couple of changes was introduced to deal with bounded streams e.g.  BoundedOneInput interface. I'm wondering would it be doable to do some kind of global sort after receiving end input event on finished data stream source, using only DataStream API? We have made some

Re: Task-manager kubernetes pods take a long time to terminate

2020-01-30 Thread Yun Tang
Hi Li Why you still use ’job-manager' as thejobmanager.rpc.address for the second new cluster? If you use another rpc address, previous task managers would not try to register with old one. Take flink documentation [1] for k8s as example. You can list/delete all pods like: kubectl get/delete

DisableGenericTypes is not compatible with Kafka

2020-01-30 Thread Oleksandr Nitavskyi
Hi guys, We have encountered on some issue related with possibility to ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as idea to ensure that nobody introduce some random change which penalize the performance of the job. The issue we have encountered is that Flink’s

Re: FsStateBackend vs RocksDBStateBackend

2020-01-30 Thread Ran Zhang
Hi Gordon, Thanks for your reply! Regarding state size - we are at 200-300gb but we have 120 parallelism which will make each task handle ~2 - 3 gb state. (when we submit the job we are setting tm memory to 15g.) In this scenario what will be the best fit for statebackend? Thanks, Ran On Wed,

Re: ActiveMQ connector

2020-01-30 Thread Piotr Nowojski
Hi, Thanks for sharing the code pointers. > His question actually boils down to one thing, regarding this class [3]. > Does having HashMap and not ConcurentHashMap in context of [3] for > unacknowledgedMessages is thread safe. Yes, it’s safe, because it’s used only in two places. 1. Under

Re: ActiveMQ connector

2020-01-30 Thread KristoffSC
Hi Piotr, I'm not sure about: "Note that if you want your state (your HashMap) to be actually checkpointed, it must be either already defined as Flink manage’d state (like `ListState` in the example [1]), or you must copy content of your `HashMap` to Flink managed state during `snapshotState`

Re: ActiveMQ connector

2020-01-30 Thread Piotr Nowojski
Hi, Regarding your last question, sorry I don’t know about ActiveMQ connectors. I’m not sure exactly how you are implementing your SourceFunction. Generally speaking `run()` method is executed in one thread, and other operations like checkpointing, timers (if any) are executed from another

Re: REST Monitoring Savepoint failed

2020-01-30 Thread Till Rohrmann
Hi Ramya, I think this message is better suited for the user ML list. Which version of Flink are you using? Have you checked the Flink logs to see whether they contain anything suspicious? Cheers, Till On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy wrote: > Hi, > > I am trying to

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-30 Thread Mark Harris
Hi, Thanks for your help with this.  The EMR cluster has 3 15GB VMs, and the flink cluster is started with: /usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3 Usually the task runs for about 15 minutes before it restarts, usually due to with an "java.lang.OutOfMemoryError:

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-30 Thread Piotr Nowojski
Hi, What is your job setup? Size of the nodes, memory settings of the Flink/JVM? 9 041 060 strings is awfully small number to bring down a whole cluster. With each tmp string having ~30 bytes, that’s only 271MB. Is this really 85% of the heap? And also, with parallelism of 6 and checkpoints

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-30 Thread Mark Harris
Trying a few different approaches to the fs.s3a.fast.upload settings has bought me no joy - the taskmanagers end up simply crashing or complaining of high GC load. Heap dumps suggest that this time they're clogged with buffers instead, which makes sense. Our job has parallelism of 6 and

Re: Reinterpreting a pre-partitioned data stream as keyed stream

2020-01-30 Thread KristoffSC
Hi, thank you for the answer. I think I understand. In my uses case I have to keep the order of events for each key, but I dont have to process keys in the same order that I received them. On one point of my pipeline I'm also using a SessionWindow. My Flink environment has operator chaining

Re: Does flink support retries on checkpoint write failures

2020-01-30 Thread Arvid Heise
If a checkpoint is not successful, it cannot be used for recovery. That means Flink will restart to the last successful checkpoint and hence not lose any data. On Wed, Jan 29, 2020 at 9:52 PM wvl wrote: > Forgive my lack of knowledge here - I'm a bit out of my league here. > > But I was

Re: Using s3 for checkpointing

2020-01-30 Thread Arvid Heise
Hi Navneeth, did you follow the plugin folder structure? [1] There is another plugin called flink-s3-fs-presto that you can use. If you want to use both plugins, use s3a:// for s3-fs-hadoop (output) and s3p:// for s3-fs-presto (checkpointing). [1]

[ANNOUNCE] Community Discounts for Flink Forward SF 2020 Registrations

2020-01-30 Thread Fabian Hueske
Hi everyone, The registration for Flink Forward SF 2020 is open now! Flink Forward San Francisco 2020 will take place from March 23rd to 25th. The conference will start with one day of training and continue with two days of keynotes and talks. We would like to invite you to join the Apache Flink

Using s3 for checkpointing

2020-01-30 Thread Navneeth Krishnan
Hi All, I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few issues. I have flink running in docker with flink-s3-fs-hadoop jar copied to plugins folder. Even after having the jar I'm getting the following error: Caused by:

How to fully re-aggregate a keyed windowed aggregate in the same window ?

2020-01-30 Thread LINZ, Arnaud
Hello, I would like to compute statistics on a stream every hour. For that, I need to compute statistics on the keyed stream, then to reaggregate them. I’ve tried the following thing : stream.keyBy(mykey) .window(1 hour process time) .aggregate(my per-key aggregate)