[ 
https://issues.apache.org/jira/browse/SPARK-27894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254196#comment-17254196
 ] 

vinay badhan commented on SPARK-27894:
--------------------------------------

What you can do is that in the foreachRDD method, 
create spark_context object from `rdd.context`
and if you need spark session object then you can do
`spark = SQLContext(rdd.context).sparkSession`

> PySpark streaming transform RDD join not works when checkpoint enabled
> ----------------------------------------------------------------------
>
>                 Key: SPARK-27894
>                 URL: https://issues.apache.org/jira/browse/SPARK-27894
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: Jeffrey(Xilang) Yan
>            Priority: Major
>
> In PySpark Steaming, if checkpoint enabled and there is a transform-join 
> operation, the error thrown.
> {code:java}
> sc=SparkContext(appName='xxxx')
> sc.setLogLevel("WARN")
> ssc=StreamingContext(sc,10)
> ssc.checkpoint("hdfs://xxxx/test")
> kafka_bootstrap_servers="xxxx"
> topics = ['xxxx', 'xxxx']
> doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))
> kvds=KafkaUtils.createDirectStream(ssc, topics, 
> kafkaParams={"metadata.broker.list": kafka_bootstrap_servers})
> line=kvds.map(lambda x:(1,2))
> line.transform(lambda rdd:rdd.join(doc_info)).pprint(10)
> ssc.start()
> ssc.awaitTermination()
> {code}
>  
> Error details:
> {code:java}
> PicklingError: Could not serialize object: Exception: It appears that you are 
> attempting to broadcast an RDD or reference an RDD from an action or 
> transformation. RDD transformations and actions can only be invoked by the 
> driver, not inside of other transformations; for example, rdd1.map(lambda x: 
> rdd2.values.count() * x) is invalid because the values transformation and 
> count action cannot be performed inside of the rdd1.map transformation. For 
> more information, see SPARK-5063.
> {code}
> The similar code works great in Scala. And if we remove any of
> {code:java}
> ssc.checkpoint("hdfs://xxxx/test") 
> {code}
> or
> {code:java}
> line.transform(lambda rdd:rdd.join(doc_info))
> {code}
> There is no error either.
>  
> It seems that when checkpoint is enabled, pyspark will serialize transform 
> lambda, and then the RDD used by lambda, while RDD cannot be serialize so the 
> error prompted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to