Hi, My code is below: from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils
def test(record_list): print(list(record_list)) return record_list def functionToCreateContext(): conf = SparkConf().setAppName("model_event").setMaster("spark://172.22.9.181:7077") \ .set("spark.executor.memory", '6g') \ .set("spark.executor.cores", '8') \ .set("spark.deploy.defaultCores", '8') \ .set("spark.cores.max", '16') \ .set("spark.streaming.kafka.maxRatePerPartition", 1) \ .set("spark.streaming.blockInterval", 1) \ .set("spark.default.parallelism", 8) \ .set("spark.driver.host", '172.22.9.181') \ sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 5) ssc.checkpoint("/spark/checkpoints/model_event_spark") return ssc if __name__ == '__main__': ssc = StreamingContext.getOrCreate("/spark/checkpoints/model_event_spark", functionToCreateContext) record_dstream = KafkaUtils.createDirectStream(ssc,topics=["installmentdb_t_bill"], kafkaParams={"bootstrap.servers":"xxx:9092", "auto.offset.reset":"smallest", }, ) record_dstream.checkpoint(5).mapPartitions(test).pprint() ssc.start() ssc.awaitTermination() When the scripts starts at the first time,it work well. But second time started from checkpointDirectory,it has problem like: 2019-07-30 02:48:50,290 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: org.apache.spark.streaming.api.python.PythonTransformedDStream@319b7bed has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) What is wrong with my script?