Can you post the relevant code?

On Fri, Nov 27, 2015 at 4:25 AM, u...@moosheimer.com <u...@moosheimer.com>
wrote:

> Hi,
>
> we have some strange behavior with KafkaUtils DirectStream and the size of
> the MapPartitionsRDDs.
>
> We use a permanent direct steam where we consume about 8.500 json
> messages/sec.
> The json messages are read, some information are extracted and the result
> of each json is a string which collect/group with reduceByKeyAndWindow.
>
> The windowLength and slideInterval are both 60 sec.
> The result of the window will be send back to another Kafka topic.
> The batch duration is 20 seconds.
>
> The RDDs are growing all the time and the data transfered to the executors
> is growing and growing.
> Since the RDDs are growing, the time to send them over the network and the
> time for processing is growing, too.
>
> We start with a processing time of 4 seconds. After 20 hours we reach 10
> seconds.
> The processing time is growing and growing and on some point the
> processing time is permanently over 20 seconds (the batch duration time)
> and Spark will run OOM or get other problems.
>
> Has anybody any idea how to fix this?
>
> We are using Spark 1.5.2 and Kafka 0.8.2.2. We read two Kafka topics - one
> with 200 Kafka partitions and one with 20 Kafka partitions.
> Spark runs in standalone cluster mode with three instances on AWS.
> We use 3 worker nodes with at all 23 executors.
>
> We start the app with these parameters:
> --conf "spark.akka.frameSize=160"
> --conf "spark.cleaner.referenceTracking.blocking=true"
> --conf "spark.cleaner.referenceTracking.blocking.shuffle=true"
> --conf "spark.cleaner.referenceTracking.cleanCheckpoints=true"
> --conf "spark.cleaner.ttl=600"
> --conf "spark.default.parallelism=69"
> --conf "spark.executor.cores=1"
> --conf "spark.executor.memory=7g"
> --conf "spark.kryoserializer.buffer.max=256m"
> --conf "spark.rrd.compress=true"
> --conf "spark.storage.memoryFraction=0.2"
> --conf "spark.streaming.backpressure.enabled=true"
> --conf "spark.streaming.kafka.maxRatePerPartition=75"
> --conf "spark.streaming.receiver.maxRate=15000"
> --conf "spark.streaming.stopGracefullyOnShutdown=true"
> --deploy-mode cluster
> --supervise
>
> Thanks,
> Uwe
>

Reply via email to