Hi, I tried a similar question before and didn't get any answers,so I'll try again:
I am using updateStateByKey, pretty much exactly as shown in the examples shipping with Spark: def createContext(master:String,dropDir:String, checkpointDirectory:String) = { val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val sparkConf = new SparkConf() .setMaster(master) .setAppName("StatefulNetworkWordCountNoMem") .setJars(StreamingContext.jarOfClass(this.getClass)); val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint(checkpointDirectory) val lines = ssc.textFileStream(dropDir) val wordDstream = lines.map(line =>{ val x = line.split("\t") ((x(0), x(1),x(2)), 1) }) wordDstream.print() val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) println("Printing stream") stateDstream.print() ssc } I am running this over a _static_ drop directory (i.e. files are there at the beginning of time, nothing new is coming). The only difference is that I have a pretty wide key space -- about 440K, each key is of length under 20 chars. The code exactly as shown above runs for a while (about an hour) and then the executors start dying with OOM exceptions. I tried executor memory from 512M to 2g (4 executors)-- the only difference is how long it takes for the OOM. The only task that keeps executing is take at DStream(line 586)...which makes sense. What doesn't make sense is why the memory isn't getting reclaimed What am I doing wrong? I'd like to use streaming but it seems that I can't get a process with 0 incoming traffic to stay up. Any advice much appreciated -- I'm sure someone on this list has managed to run a streaming program for longer than an hour!