Hi, I tried a similar question before and didn't get any answers,so I'll
try again:

I am using updateStateByKey, pretty much exactly as shown in the examples
shipping with Spark:

 def createContext(master:String,dropDir:String, checkpointDirectory:String) = {
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val sparkConf = new SparkConf()
      .setMaster(master)
      .setAppName("StatefulNetworkWordCountNoMem")
      .setJars(StreamingContext.jarOfClass(this.getClass));


    val ssc = new StreamingContext(sparkConf, Seconds(10))
    ssc.checkpoint(checkpointDirectory)

   val lines = ssc.textFileStream(dropDir)

    val wordDstream = lines.map(line =>{
      val x = line.split("\t")
     ((x(0), x(1),x(2)), 1)
    })
    wordDstream.print()
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    println("Printing stream")
    stateDstream.print()
    ssc
  }

​


I am running this over a _static_ drop directory (i.e. files are there at
the beginning of time, nothing new is coming).

The only difference is that I have a pretty wide key space -- about 440K,
each key is of length under 20 chars.

The code exactly as shown above runs for a while (about an hour) and then
the executors start dying with OOM exceptions. I tried executor memory from
512M to 2g (4 executors)-- the only difference is how long it takes for the
OOM. The only task that keeps executing is take at DStream(line
586)...which makes sense. What doesn't make sense is why the memory isn't
getting reclaimed

What am I doing wrong? I'd like to use streaming but it seems that I can't
get a process with 0 incoming traffic to stay up. Any advice much
appreciated -- I'm sure someone on this list has managed to run a streaming
program for longer than an hour!

Reply via email to