Passing records between two jobs
Hello, We are planning a system that will be comprised of 3 different jobs: 1. Getting a stream of events, adding some metadata to the events, and outputting them to a temporary message queue. 2. Performing some calculations on the events we got from job 1, as required for product A. 3. Performing a different set of calculations of the events from job 1, for product B. All 3 jobs will be developed by different teams, so we don't want to create one massive job that does everything. The problem is that every message queuing sink only provides at-least-once guarantee. If job 1 crashes and recovers, we will get the same events in the queue and jobs 2 and 3 will process events twice. This is obviously a problem, and I guess we are not the first to stumble upon it. Did anyone else had this issue? It seems to me like a fundamental problem of passing data between jobs, so hopefully there are known solutions and best practices. It would be great if you can share any solution. Thanks, Avihai
Standalone cluster layout
Hi folks, I am setting up a Flink cluster for testing, and I have a few questions regarding memory allocations: 1. Is there a recommended limit to the size of a TaskManager heap? I saw that Flink uses G1GC, so we can use dozens of GB. 2. Following the above question, should I use only one TaskManager process per machine, and give it all the available memory (minus a couple of GB for the OS)? 3. Should I reserve some memory for RocksDB? The partitioned state will be around 500GB in size, and to my understanding RocksDB runs in native code and so uses off-heap memory. 4. What is the recommended heap size of a JobManager? I expect that the cluster will run only 2 jobs at the same time. The planned layout of the standalone cluster is: * 3 small JobManager machines, running: * 1 process of Zookeeper peer * 1 JobManager process * N large TaskManager machines, each running 1 TM process Thanks! Avihai
RE: Standalone cluster layout
Thank you for the answers. The cluster will run in Azure, so I will be using HDFS over Azure Blob Store, as outlined in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Azure-Blob-Storage-Connector-td8536.html I got pretty good performance in my tests, and it should handle scaling well. We will see how it performs under real production loads. From: Robert Metzger [mailto:rmetz...@apache.org] Sent: Wednesday, December 14, 2016 4:59 PM To: user@flink.apache.org Subject: Re: Standalone cluster layout Hi Avihai, 1. As much as possible (I would leave the operating system at least 1 GB of memory). It depends also on the workload you have. For streaming workload with very small state, you can use Flink with 1-2 GB of heap space and still get very good performance. 2. Yes, I would recommend to run one large Taskmanager per machine, because you save on "management overhead" and you benefit from faster data transfers locally. 3. If you give your Taskmanagers say 10 GB of heap, its likely that the process in the OS is using ~12 GB of memory in total (our network stack is also using some offheap memory). You can fine-tune the (memory) behavior of Rocks, but by default its not using a lot of memory. 4. I would give it at least 2 GB, if you run multiple jobs or larger jobs (high parallelism, many machines, many tasks), than maybe even more. The layout of the standalone cluster looks good. Where are you planning to write the state checkpoints to? Given that you have 500 Gb of state, you should consider how you want to store that state somewhere reliably. For larger states, its recommended to have a good network connection between the workers (machines running TMs) and the distributed file system (say S3, HDFS, ...). On Tue, Dec 13, 2016 at 5:41 PM, Avihai Berkovitz mailto:avihai.berkov...@microsoft.com>> wrote: Hi folks, I am setting up a Flink cluster for testing, and I have a few questions regarding memory allocations: 1. Is there a recommended limit to the size of a TaskManager heap? I saw that Flink uses G1GC, so we can use dozens of GB. 2. Following the above question, should I use only one TaskManager process per machine, and give it all the available memory (minus a couple of GB for the OS)? 3. Should I reserve some memory for RocksDB? The partitioned state will be around 500GB in size, and to my understanding RocksDB runs in native code and so uses off-heap memory. 4. What is the recommended heap size of a JobManager? I expect that the cluster will run only 2 jobs at the same time. The planned layout of the standalone cluster is: * 3 small JobManager machines, running: * 1 process of Zookeeper peer * 1 JobManager process * N large TaskManager machines, each running 1 TM process Thanks! Avihai
Possible JVM native memory leak
Hello, I am running a streaming job on a small cluster, and after a few hours I noticed that my TaskManager processes are being killed by the OOM killer. The processes were using too much memory. After a bit of monitoring, I have the following status: * The maximum heap size (Xmx) is 4M * Native Memory Tracking reports that the process has 44180459KB committed, which is reasonable given the GC and threads overhead (the full summery report is attached later) * There are 644 threads * The Status.JVM.Memory.NonHeap.Committed metric is 245563392 * The Status.JVM.Memory.Direct.MemoryUsed metric is 354777032 * Using pmap we find that the private committed memory is 54879428K and mapped is 62237852K So we have about 10GB of memory that was allocated in the process but is unknown to the JVM itself. Some more info: * I am running Flink 1.1.4 * I am using RocksDB for state * The state is saved to Azure Blob Store, using the NativeAzureFileSystem HDFS connector over the wasbs protocol * The cluster is a standalone HA cluster * The machine is an Ubuntu 14.04.5 LTS 64 bit server * I have around 2 GB of state per TaskManager Another thing I noticed is that the job sometimes fails (due to external DB connectivity issues) and is restarted automatically as expected. But in some cases the failures also cause one or more of the following error logs: * Could not close the file system output stream. Trying to delete the underlying file. * Could not discard the 1th operator state of checkpoint 93 for operator Operator2. java.lang.NullPointerException: null at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:953) ~[flink-dist_2.10-1.1.4.jar:1.1.4] I have 2 theories, and I hope to hear any ideas from you: 1. RocksDB uses this memory for internal caching. If so, how can this usage be tracked, and what options can tune and limit it? 2. Internal RocksDB objects are not being disposed of properly, probably during the aforementioned job restarts, and so we have a growing memory leak. If so, do you have any idea what can cause this? Thank you, Avihai Attached Native Memory Tracking (jcmd VM.native_memory summary): Total: reserved=44399603KB, committed=44180459KB - Java Heap (reserved=4096KB, committed=4096KB) (mmap: reserved=4096KB, committed=4096KB) - Class (reserved=134031KB, committed=132751KB) (classes #22310) (malloc=2959KB #43612) (mmap: reserved=131072KB, committed=129792KB) -Thread (reserved=716331KB, committed=716331KB) (thread #694) (stack: reserved=712404KB, committed=712404KB) (malloc=2283KB #3483) (arena=1644KB #1387) - Code (reserved=273273KB, committed=135409KB) (malloc=23673KB #30410) (mmap: reserved=249600KB, committed=111736KB) -GC (reserved=1635902KB, committed=1635902KB) (malloc=83134KB #70605) (mmap: reserved=1552768KB, committed=1552768KB) - Compiler (reserved=1634KB, committed=1634KB) (malloc=1504KB #2062) (arena=131KB #3) - Internal (reserved=575283KB, committed=575283KB) (malloc=575251KB #106644) (mmap: reserved=32KB, committed=32KB) -Symbol (reserved=16394KB, committed=16394KB) (malloc=14468KB #132075) (arena=1926KB #1) -Native Memory Tracking (reserved=6516KB, committed=6516KB) (malloc=338KB #5024) (tracking overhead=6178KB) - Arena Chunk (reserved=237KB, committed=237KB) (malloc=237KB) - Unknown (reserved=8KB, committed=0KB) (mmap: reserved=8KB, committed=0KB)