Hey Andrey,

`ConstantInputDStream` doesn't support checkpoint as it contains an RDD
field. It cannot resume from checkpoints.

On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov <andrey.yego...@gmail.com>
wrote:

> Hi,
>
> I am new to spark (and scala) and hope someone can help me with the issue
> I got stuck on in my experiments/learning.
>
> mapWithState from spark 1.6 seems to be a great way for the task I want to
> implement with spark but unfortunately I am getting error "RDD
> transformations and actions can only be invoked by the driver, not inside
> of other transformations" on job restart when checkpoint already exists.
> Job starts and works ok if checkpoint is empty (this kind of defeats the
> point of having the checkpoint).
>
> I can reproduce it with ~65 lines of test code, see below.
> Is there something that I am doing wrong there?
>
> code:
> ----
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.dstream.ConstantInputDStream
> import org.apache.spark.streaming.{Durations, StreamingContext, _}
>
> object TestJob {
>   def stateFunc(id: String,
>                 txt: Option[Iterable[String]],
>                 state: State[String]) : (String, Long) = {
>     if (txt.nonEmpty) {
>       val aggregatedString = state.getOption().getOrElse("") + txt
>       state.update(aggregatedString)
>       (id, aggregatedString.length)
>     } else { // happens when state is timing out? any other cases?
>       (id, 0)
>     }
>   }
>
>   def createContext(checkpointDirectory: String): StreamingContext = {
>     val sparkConf = new
> SparkConf().setMaster("local[2]").setAppName("test")
>
>     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
>     ssc.checkpoint(checkpointDirectory)
>
>     val input = Seq("1", "21", "321", "41", "42", "543", "67")
>     val inputRdd = ssc.sparkContext.parallelize(input)
>     val testStream = new ConstantInputDStream(ssc, inputRdd)
>
>     val streamWithIds = testStream.map(x => (x.substring(0,1), x))
>     val batched = streamWithIds.groupByKey()
>
>     val stateSpec = StateSpec.function(stateFunc
> _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days
>
>     val result = batched.mapWithState(stateSpec)
>     result.print
>     ssc
>   }
>
>   def main(args: Array[String]): Unit = {
>     val checkpointDirectory = com.google.common.io.Files.createTempDir()
>     checkpointDirectory.deleteOnExit()
>     val checkpointDirectoryName = checkpointDirectory.getAbsolutePath
>
>     val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
>       () => {
>         createContext(checkpointDirectoryName)
>       })
>
>     ssc.start()
>     ssc.awaitTerminationOrTimeout(7000)
>     ssc.stop()
>     Thread.sleep(5000)
>
>     val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
>       () => {
>         createContext(checkpointDirectoryName)
>       })
>
>     // terminates here with
>     // Exception in thread "main" org.apache.spark.SparkException: RDD
> transformations and actions can only be invoked by the driver, not inside
> of other transformations; for example, rdd1.map(x => rdd2.values.count() *
> x) is invalid because the values transformation and count action cannot be
> performed inside of the rdd1.map transformation. For more information, see
> SPARK-5063.
>     ssc2.start()
>     ssc2.awaitTerminationOrTimeout(7000)
>     ssc2.stop()
>   }
> }
>
> ----------
> Andrey Yegorov
>

Reply via email to