Can you post the relevant code? On Fri, Nov 27, 2015 at 4:25 AM, u...@moosheimer.com <u...@moosheimer.com> wrote:
> Hi, > > we have some strange behavior with KafkaUtils DirectStream and the size of > the MapPartitionsRDDs. > > We use a permanent direct steam where we consume about 8.500 json > messages/sec. > The json messages are read, some information are extracted and the result > of each json is a string which collect/group with reduceByKeyAndWindow. > > The windowLength and slideInterval are both 60 sec. > The result of the window will be send back to another Kafka topic. > The batch duration is 20 seconds. > > The RDDs are growing all the time and the data transfered to the executors > is growing and growing. > Since the RDDs are growing, the time to send them over the network and the > time for processing is growing, too. > > We start with a processing time of 4 seconds. After 20 hours we reach 10 > seconds. > The processing time is growing and growing and on some point the > processing time is permanently over 20 seconds (the batch duration time) > and Spark will run OOM or get other problems. > > Has anybody any idea how to fix this? > > We are using Spark 1.5.2 and Kafka 0.8.2.2. We read two Kafka topics - one > with 200 Kafka partitions and one with 20 Kafka partitions. > Spark runs in standalone cluster mode with three instances on AWS. > We use 3 worker nodes with at all 23 executors. > > We start the app with these parameters: > --conf "spark.akka.frameSize=160" > --conf "spark.cleaner.referenceTracking.blocking=true" > --conf "spark.cleaner.referenceTracking.blocking.shuffle=true" > --conf "spark.cleaner.referenceTracking.cleanCheckpoints=true" > --conf "spark.cleaner.ttl=600" > --conf "spark.default.parallelism=69" > --conf "spark.executor.cores=1" > --conf "spark.executor.memory=7g" > --conf "spark.kryoserializer.buffer.max=256m" > --conf "spark.rrd.compress=true" > --conf "spark.storage.memoryFraction=0.2" > --conf "spark.streaming.backpressure.enabled=true" > --conf "spark.streaming.kafka.maxRatePerPartition=75" > --conf "spark.streaming.receiver.maxRate=15000" > --conf "spark.streaming.stopGracefullyOnShutdown=true" > --deploy-mode cluster > --supervise > > Thanks, > Uwe >