[ https://issues.apache.org/jira/browse/SPARK-31795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117480#comment-17117480 ]
Hyukjin Kwon commented on SPARK-31795: -------------------------------------- Please take a look for https://spark.apache.org/community.html > Stream Data with API to ServiceNow > ---------------------------------- > > Key: SPARK-31795 > URL: https://issues.apache.org/jira/browse/SPARK-31795 > Project: Spark > Issue Type: Question > Components: DStreams > Affects Versions: 2.4.5 > Reporter: Dominic Wetenkamp > Priority: Major > > 1) Create class > 2) Instantiate class > 3) Setup stream > 4) Write stream. Here do I get a pickeling error, I really don't know how to > get it work without error. > > > class CMDB: > #Public Properties > @property > def streamDF(self): > return spark.readStream.table(self.__source_table) > > #Constructor > def __init__(self, destination_table, source_table): > self.__destination_table = destination_table > self.__source_table = source_table > #Private Methodes > def __processRow(self, row): > #API connection info > url = 'https://foo.service-now.com/api/now/table/' + > self.__destination_table + '?sysparm_display_value=true' > user = 'username' > password = 'psw' > > headers = \{"Content-Type":"application/json","Accept":"application/json"} > response = requests.post(url, auth=(user, password), headers=headers, data = > json.dumps(row.asDict())) > > return response > #Public Methodes > def uploadStreamDF(self, df): > return df.writeStream.foreach(self.__processRow).trigger(once=True).start() > > ################################################################################ > > cmdb = CMDB('destination_table_name','source_table_name') > streamDF = (cmdb.streamDF > .withColumn('object_id',col('source_column_id')) > .withColumn('name',col('source_column_name')) > ).select('object_id','name') > #set stream works, able to display data > cmdb.uploadStreamDF(streamDF) > #cmdb.uploadStreamDF(streamDF) fails with error: PicklingError: Could not > serialize object: Exception: It appears that you are attempting to reference > SparkContext from a broadcast variable, action, or transformation. > SparkContext can only be used on the driver, not in code that it run on > workers. For more information, see SPARK-5063. See exception below: > ''' > Exception Traceback (most recent call last) > /databricks/spark/python/pyspark/serializers.py in dumps(self, obj) > 704 try: > --> 705 return cloudpickle.dumps(obj, 2) > 706 except pickle.PickleError: > /databricks/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol) > 862 cp = CloudPickler(file,protocol) > --> 863 cp.dump(obj) > 864 return file.getvalue() > /databricks/spark/python/pyspark/cloudpickle.py in dump(self, obj) > 259 try: > --> 260 return Pickler.dump(self, obj) > 261 except RuntimeError as e: > /databricks/python/lib/python3.7/pickle.py in dump(self, obj) > 436 self.framer.start_framing() > --> 437 self.save(obj) > 438 self.write(STOP) > ''' -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org