Passing records between two jobs

2018-06-18 Thread Avihai Berkovitz
Hello,

We are planning a system that will be comprised of 3 different jobs:

  1.  Getting a stream of events, adding some metadata to the events, and 
outputting them to a temporary message queue.
  2.  Performing some calculations on the events we got from job 1, as required 
for product A.
  3.  Performing a different set of calculations of the events from job 1, for 
product B.

All 3 jobs will be developed by different teams, so we don't want to create one 
massive job that does everything.
The problem is that every message queuing sink only provides at-least-once 
guarantee. If job 1 crashes and recovers, we will get the same events in the 
queue and jobs 2 and 3 will process events twice. This is obviously a problem, 
and I guess we are not the first to stumble upon it.

Did anyone else had this issue? It seems to me like a fundamental problem of 
passing data between jobs, so hopefully there are known solutions and best 
practices. It would be great if you can share any solution.

Thanks,
Avihai



Standalone cluster layout

2016-12-13 Thread Avihai Berkovitz
Hi folks,

I am setting up a Flink cluster for testing, and I have a few questions 
regarding memory allocations:

  1.  Is there a recommended limit to the size of a TaskManager heap? I saw 
that Flink uses G1GC, so we can use dozens of GB.
  2.  Following the above question, should I use only one TaskManager process 
per machine, and give it all the available memory (minus a couple of GB for the 
OS)?
  3.  Should I reserve some memory for RocksDB? The partitioned state will be 
around 500GB in size, and to my understanding RocksDB runs in native code and 
so uses off-heap memory.
  4.  What is the recommended heap size of a JobManager? I expect that the 
cluster will run only 2 jobs at the same time.

The planned layout of the standalone cluster is:

  *   3 small JobManager machines, running:
 *   1 process of Zookeeper peer
 *   1 JobManager process
  *   N large TaskManager machines, each running 1 TM process

Thanks!
Avihai



RE: Standalone cluster layout

2016-12-15 Thread Avihai Berkovitz
Thank you for the answers. The cluster will run in Azure, so I will be using 
HDFS over Azure Blob Store, as outlined in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Azure-Blob-Storage-Connector-td8536.html
I got pretty good performance in my tests, and it should handle scaling well. 
We will see how it performs under real production loads.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Wednesday, December 14, 2016 4:59 PM
To: user@flink.apache.org
Subject: Re: Standalone cluster layout

Hi Avihai,

1. As much as possible (I would leave the operating system at least 1 GB of 
memory). It depends also on the workload you have. For streaming workload with 
very small state, you can use Flink with 1-2 GB of heap space and still get 
very good performance.
2. Yes, I would recommend to run one large Taskmanager per machine, because you 
save on "management overhead" and you benefit from faster data transfers 
locally.
3. If you give your Taskmanagers say 10 GB of heap, its likely that the process 
in the OS is using ~12 GB of memory in total (our network stack is also using 
some offheap memory). You can fine-tune the (memory) behavior of Rocks, but by 
default its not using a lot of memory.

4. I would give it at least 2 GB, if you run multiple jobs or larger jobs (high 
parallelism, many machines, many tasks), than maybe even more.


The layout of the standalone cluster looks good.
Where are you planning to write the state checkpoints to? Given that you have 
500 Gb of state, you should consider how you want to store that state somewhere 
reliably. For larger states, its recommended to have a good network connection 
between the workers (machines running TMs) and the distributed file system (say 
S3, HDFS, ...).



On Tue, Dec 13, 2016 at 5:41 PM, Avihai Berkovitz 
mailto:avihai.berkov...@microsoft.com>> wrote:
Hi folks,

I am setting up a Flink cluster for testing, and I have a few questions 
regarding memory allocations:

  1.  Is there a recommended limit to the size of a TaskManager heap? I saw 
that Flink uses G1GC, so we can use dozens of GB.
  2.  Following the above question, should I use only one TaskManager process 
per machine, and give it all the available memory (minus a couple of GB for the 
OS)?
  3.  Should I reserve some memory for RocksDB? The partitioned state will be 
around 500GB in size, and to my understanding RocksDB runs in native code and 
so uses off-heap memory.
  4.  What is the recommended heap size of a JobManager? I expect that the 
cluster will run only 2 jobs at the same time.

The planned layout of the standalone cluster is:

  *   3 small JobManager machines, running:

 *   1 process of Zookeeper peer
 *   1 JobManager process

  *   N large TaskManager machines, each running 1 TM process

Thanks!
Avihai




Possible JVM native memory leak

2017-01-17 Thread Avihai Berkovitz
Hello,

I am running a streaming job on a small cluster, and after a few hours I 
noticed that my TaskManager processes are being killed by the OOM killer. The 
processes were using too much memory. After a bit of monitoring, I have the 
following status:

  *   The maximum heap size (Xmx) is 4M
  *   Native Memory Tracking reports that the process has 44180459KB committed, 
which is reasonable given the GC and threads overhead (the full summery report 
is attached later)
  *   There are 644 threads
  *   The Status.JVM.Memory.NonHeap.Committed metric is 245563392
  *   The Status.JVM.Memory.Direct.MemoryUsed metric is 354777032
  *   Using pmap we find that the private committed memory is 54879428K and 
mapped is 62237852K

So we have about 10GB of memory that was allocated in the process but is 
unknown to the JVM itself.

Some more info:

  *   I am running Flink 1.1.4
  *   I am using RocksDB for state
  *   The state is saved to Azure Blob Store, using the NativeAzureFileSystem 
HDFS connector over the wasbs protocol
  *   The cluster is a standalone HA cluster
  *   The machine is an Ubuntu 14.04.5 LTS 64 bit server
  *   I have around 2 GB of state per TaskManager

Another thing I noticed is that the job sometimes fails (due to external DB 
connectivity issues) and is restarted automatically as expected. But in some 
cases the failures also cause one or more of the following error logs:

  *   Could not close the file system output stream. Trying to delete the 
underlying file.
  *   Could not discard the 1th operator state of checkpoint 93 for operator 
Operator2.
java.lang.NullPointerException: null

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:953)
 ~[flink-dist_2.10-1.1.4.jar:1.1.4]

I have 2 theories, and I hope to hear any ideas from you:

  1.  RocksDB uses this memory for internal caching. If so, how can this usage 
be tracked, and what options can tune and limit it?
  2.  Internal RocksDB objects are not being disposed of properly, probably 
during the aforementioned job restarts, and so we have a growing memory leak. 
If so, do you have any idea what can cause this?

Thank you,
Avihai


Attached Native Memory Tracking (jcmd  VM.native_memory summary):

Total: reserved=44399603KB, committed=44180459KB
- Java Heap (reserved=4096KB, committed=4096KB)
(mmap: reserved=4096KB, committed=4096KB)

- Class (reserved=134031KB, committed=132751KB)
(classes #22310)
(malloc=2959KB #43612)
(mmap: reserved=131072KB, committed=129792KB)

-Thread (reserved=716331KB, committed=716331KB)
(thread #694)
(stack: reserved=712404KB, committed=712404KB)
(malloc=2283KB #3483)
(arena=1644KB #1387)

-  Code (reserved=273273KB, committed=135409KB)
(malloc=23673KB #30410)
(mmap: reserved=249600KB, committed=111736KB)

-GC (reserved=1635902KB, committed=1635902KB)
(malloc=83134KB #70605)
(mmap: reserved=1552768KB, committed=1552768KB)

-  Compiler (reserved=1634KB, committed=1634KB)
(malloc=1504KB #2062)
(arena=131KB #3)

-  Internal (reserved=575283KB, committed=575283KB)
(malloc=575251KB #106644)
(mmap: reserved=32KB, committed=32KB)

-Symbol (reserved=16394KB, committed=16394KB)
(malloc=14468KB #132075)
(arena=1926KB #1)

-Native Memory Tracking (reserved=6516KB, committed=6516KB)
(malloc=338KB #5024)
(tracking overhead=6178KB)

-   Arena Chunk (reserved=237KB, committed=237KB)
(malloc=237KB)

-   Unknown (reserved=8KB, committed=0KB)
(mmap: reserved=8KB, committed=0KB)