Passing records between two jobs

2018-06-18 Thread Avihai Berkovitz
Hello,

We are planning a system that will be comprised of 3 different jobs:

  1.  Getting a stream of events, adding some metadata to the events, and 
outputting them to a temporary message queue.
  2.  Performing some calculations on the events we got from job 1, as required 
for product A.
  3.  Performing a different set of calculations of the events from job 1, for 
product B.

All 3 jobs will be developed by different teams, so we don't want to create one 
massive job that does everything.
The problem is that every message queuing sink only provides at-least-once 
guarantee. If job 1 crashes and recovers, we will get the same events in the 
queue and jobs 2 and 3 will process events twice. This is obviously a problem, 
and I guess we are not the first to stumble upon it.

Did anyone else had this issue? It seems to me like a fundamental problem of 
passing data between jobs, so hopefully there are known solutions and best 
practices. It would be great if you can share any solution.

Thanks,
Avihai



Possible JVM native memory leak

2017-01-17 Thread Avihai Berkovitz
Hello,

I am running a streaming job on a small cluster, and after a few hours I 
noticed that my TaskManager processes are being killed by the OOM killer. The 
processes were using too much memory. After a bit of monitoring, I have the 
following status:

  *   The maximum heap size (Xmx) is 4M
  *   Native Memory Tracking reports that the process has 44180459KB committed, 
which is reasonable given the GC and threads overhead (the full summery report 
is attached later)
  *   There are 644 threads
  *   The Status.JVM.Memory.NonHeap.Committed metric is 245563392
  *   The Status.JVM.Memory.Direct.MemoryUsed metric is 354777032
  *   Using pmap we find that the private committed memory is 54879428K and 
mapped is 62237852K

So we have about 10GB of memory that was allocated in the process but is 
unknown to the JVM itself.

Some more info:

  *   I am running Flink 1.1.4
  *   I am using RocksDB for state
  *   The state is saved to Azure Blob Store, using the NativeAzureFileSystem 
HDFS connector over the wasbs protocol
  *   The cluster is a standalone HA cluster
  *   The machine is an Ubuntu 14.04.5 LTS 64 bit server
  *   I have around 2 GB of state per TaskManager

Another thing I noticed is that the job sometimes fails (due to external DB 
connectivity issues) and is restarted automatically as expected. But in some 
cases the failures also cause one or more of the following error logs:

  *   Could not close the file system output stream. Trying to delete the 
underlying file.
  *   Could not discard the 1th operator state of checkpoint 93 for operator 
Operator2.
java.lang.NullPointerException: null

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:953)
 ~[flink-dist_2.10-1.1.4.jar:1.1.4]

I have 2 theories, and I hope to hear any ideas from you:

  1.  RocksDB uses this memory for internal caching. If so, how can this usage 
be tracked, and what options can tune and limit it?
  2.  Internal RocksDB objects are not being disposed of properly, probably 
during the aforementioned job restarts, and so we have a growing memory leak. 
If so, do you have any idea what can cause this?

Thank you,
Avihai


Attached Native Memory Tracking (jcmd  VM.native_memory summary):

Total: reserved=44399603KB, committed=44180459KB
- Java Heap (reserved=4096KB, committed=4096KB)
(mmap: reserved=4096KB, committed=4096KB)

- Class (reserved=134031KB, committed=132751KB)
(classes #22310)
(malloc=2959KB #43612)
(mmap: reserved=131072KB, committed=129792KB)

-Thread (reserved=716331KB, committed=716331KB)
(thread #694)
(stack: reserved=712404KB, committed=712404KB)
(malloc=2283KB #3483)
(arena=1644KB #1387)

-  Code (reserved=273273KB, committed=135409KB)
(malloc=23673KB #30410)
(mmap: reserved=249600KB, committed=111736KB)

-GC (reserved=1635902KB, committed=1635902KB)
(malloc=83134KB #70605)
(mmap: reserved=1552768KB, committed=1552768KB)

-  Compiler (reserved=1634KB, committed=1634KB)
(malloc=1504KB #2062)
(arena=131KB #3)

-  Internal (reserved=575283KB, committed=575283KB)
(malloc=575251KB #106644)
(mmap: reserved=32KB, committed=32KB)

-Symbol (reserved=16394KB, committed=16394KB)
(malloc=14468KB #132075)
(arena=1926KB #1)

-Native Memory Tracking (reserved=6516KB, committed=6516KB)
(malloc=338KB #5024)
(tracking overhead=6178KB)

-   Arena Chunk (reserved=237KB, committed=237KB)
(malloc=237KB)

-   Unknown (reserved=8KB, committed=0KB)
(mmap: reserved=8KB, committed=0KB)



Standalone cluster layout

2016-12-13 Thread Avihai Berkovitz
Hi folks,

I am setting up a Flink cluster for testing, and I have a few questions 
regarding memory allocations:

  1.  Is there a recommended limit to the size of a TaskManager heap? I saw 
that Flink uses G1GC, so we can use dozens of GB.
  2.  Following the above question, should I use only one TaskManager process 
per machine, and give it all the available memory (minus a couple of GB for the 
OS)?
  3.  Should I reserve some memory for RocksDB? The partitioned state will be 
around 500GB in size, and to my understanding RocksDB runs in native code and 
so uses off-heap memory.
  4.  What is the recommended heap size of a JobManager? I expect that the 
cluster will run only 2 jobs at the same time.

The planned layout of the standalone cluster is:

  *   3 small JobManager machines, running:
 *   1 process of Zookeeper peer
 *   1 JobManager process
  *   N large TaskManager machines, each running 1 TM process

Thanks!
Avihai