Jeffrey(Xilang) Yan created SPARK-27894: -------------------------------------------
Summary: PySpark streaming transform RDD join not works when checkpoint enabled Key: SPARK-27894 URL: https://issues.apache.org/jira/browse/SPARK-27894 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.0 Reporter: Jeffrey(Xilang) Yan In PySpark Steaming, if checkpoint enabled and there is a transform-join operation, the error thrown. {{sc=SparkContext(appName='xxxx') }} {{sc.setLogLevel("WARN") }} {{ssc=StreamingContext(sc,10) }} {{ssc.checkpoint("hdfs://xxxx/test") }} {{kafka_bootstrap_servers="xxxx" }} {{topics = ['xxxx', 'xxxx'] }} {{doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))}} {{ kvds=KafkaUtils.createDirectStream(ssc, topics, kafkaParams=\{"metadata.broker.list": kafka_bootstrap_servers}) }} {{line=kvds.map(lambda x:(1,2)) }} {{line.transform(lambda rdd:rdd.join(doc_info)).pprint(10) }} {{ssc.start() }} {{ssc.awaitTermination() }} Error details: {{PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. }} The similar code works great in Scala. And if we remove any of {{ssc.checkpoint("hdfs://xxxx/test") }} or {{line.transform(lambda rdd:rdd.join(doc_info)) }} There is no error either. It seems that when checkpoint is enabled, pyspark will serialize transform lambda, and then the RDD used by lambda, while RDD cannot be serialize so the error prompted. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org