[ https://issues.apache.org/jira/browse/SPARK-27894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254196#comment-17254196 ]
vinay badhan commented on SPARK-27894: -------------------------------------- What you can do is that in the foreachRDD method, create spark_context object from `rdd.context` and if you need spark session object then you can do `spark = SQLContext(rdd.context).sparkSession` > PySpark streaming transform RDD join not works when checkpoint enabled > ---------------------------------------------------------------------- > > Key: SPARK-27894 > URL: https://issues.apache.org/jira/browse/SPARK-27894 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.0 > Reporter: Jeffrey(Xilang) Yan > Priority: Major > > In PySpark Steaming, if checkpoint enabled and there is a transform-join > operation, the error thrown. > {code:java} > sc=SparkContext(appName='xxxx') > sc.setLogLevel("WARN") > ssc=StreamingContext(sc,10) > ssc.checkpoint("hdfs://xxxx/test") > kafka_bootstrap_servers="xxxx" > topics = ['xxxx', 'xxxx'] > doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11))) > kvds=KafkaUtils.createDirectStream(ssc, topics, > kafkaParams={"metadata.broker.list": kafka_bootstrap_servers}) > line=kvds.map(lambda x:(1,2)) > line.transform(lambda rdd:rdd.join(doc_info)).pprint(10) > ssc.start() > ssc.awaitTermination() > {code} > > Error details: > {code:java} > PicklingError: Could not serialize object: Exception: It appears that you are > attempting to broadcast an RDD or reference an RDD from an action or > transformation. RDD transformations and actions can only be invoked by the > driver, not inside of other transformations; for example, rdd1.map(lambda x: > rdd2.values.count() * x) is invalid because the values transformation and > count action cannot be performed inside of the rdd1.map transformation. For > more information, see SPARK-5063. > {code} > The similar code works great in Scala. And if we remove any of > {code:java} > ssc.checkpoint("hdfs://xxxx/test") > {code} > or > {code:java} > line.transform(lambda rdd:rdd.join(doc_info)) > {code} > There is no error either. > > It seems that when checkpoint is enabled, pyspark will serialize transform > lambda, and then the RDD used by lambda, while RDD cannot be serialize so the > error prompted. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org