Got the exception when joining RDD with spark streamRDD
Hi, I am writing pyspark stream program. I have the training data set to compute the regression model. I want to use the stream data set to test the model. So, I join with RDD with the StreamRDD, but i got the exception. Following are my source code, and the exception I got. Any help is appreciated. Thanks Regards, Afancy from __future__ import print_function import sys,os,datetime from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql.context import SQLContext from pyspark.resultiterable import ResultIterable from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD import numpy as np import statsmodels.api as sm def splitLine(line, delimiter='|'): values = line.split(delimiter) st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S') return (values[0],st.hour), values[2:] def reg_m(y, x): ones = np.ones(len(x[0])) X = sm.add_constant(np.column_stack((x[0], ones))) for ele in x[1:]: X = sm.add_constant(np.column_stack((ele, X))) results = sm.OLS(y, X).fit() return results def train(line): y,x = [],[] y, x = [],[[],[],[],[],[],[]] reading_tmp,temp_tmp = [],[] i = 0 for reading, temperature in line[1]: if i%4==0 and len(reading_tmp)==4: y.append(reading_tmp.pop()) x[0].append(reading_tmp.pop()) x[1].append(reading_tmp.pop()) x[2].append(reading_tmp.pop()) temp = float(temp_tmp[0]) del temp_tmp[:] x[3].append(temp-20.0 if temp20.0 else 0.0) x[4].append(16.0-temp if temp16.0 else 0.0) x[5].append(5.0-temp if temp5.0 else 0.0) reading_tmp.append(float(reading)) temp_tmp.append(float(temperature)) i = i + 1 return str(line[0]),reg_m(y, x).params.tolist() if __name__ == __main__: if len(sys.argv) != 4: print(Usage: regression.py checkpointDir trainingDataDir streamDataDir, file=sys.stderr) exit(-1) checkpoint, trainingInput, streamInput = sys.argv[1:] sc = SparkContext(local[2], appName=BenchmarkSparkStreaming) trainingLines = sc.textFile(trainingInput) modelRDD = trainingLines.map(lambda line: splitLine(line, |))\ .groupByKey()\ .map(lambda line: train(line))\ .cache() ssc = StreamingContext(sc, 2) ssc.checkpoint(checkpoint) lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, |)) testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD)) testRDD.pprint(20) ssc.start() ssc.awaitTermination() 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to 6 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6 Traceback (most recent call last): File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py, line 90, in dumps return bytearray(self.serializer.dumps((func.func, func.deserializers))) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py, line 427, in dumps return cloudpickle.dumps(obj, 2) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 622, in dumps cp.dump(obj) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 107, in dump return Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 193, in save_function self.save_function_tuple(obj) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 236, in save_function_tuple save((code, closure, base_globals)) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends save(x) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 193, in save_function
Re: Got the exception when joining RDD with spark streamRDD
This seems be a bug, could you file a JIRA for it? RDD should be serializable for Streaming job. On Thu, Jun 18, 2015 at 4:25 AM, Groupme grou...@gmail.com wrote: Hi, I am writing pyspark stream program. I have the training data set to compute the regression model. I want to use the stream data set to test the model. So, I join with RDD with the StreamRDD, but i got the exception. Following are my source code, and the exception I got. Any help is appreciated. Thanks Regards, Afancy from __future__ import print_function import sys,os,datetime from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql.context import SQLContext from pyspark.resultiterable import ResultIterable from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD import numpy as np import statsmodels.api as sm def splitLine(line, delimiter='|'): values = line.split(delimiter) st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S') return (values[0],st.hour), values[2:] def reg_m(y, x): ones = np.ones(len(x[0])) X = sm.add_constant(np.column_stack((x[0], ones))) for ele in x[1:]: X = sm.add_constant(np.column_stack((ele, X))) results = sm.OLS(y, X).fit() return results def train(line): y,x = [],[] y, x = [],[[],[],[],[],[],[]] reading_tmp,temp_tmp = [],[] i = 0 for reading, temperature in line[1]: if i%4==0 and len(reading_tmp)==4: y.append(reading_tmp.pop()) x[0].append(reading_tmp.pop()) x[1].append(reading_tmp.pop()) x[2].append(reading_tmp.pop()) temp = float(temp_tmp[0]) del temp_tmp[:] x[3].append(temp-20.0 if temp20.0 else 0.0) x[4].append(16.0-temp if temp16.0 else 0.0) x[5].append(5.0-temp if temp5.0 else 0.0) reading_tmp.append(float(reading)) temp_tmp.append(float(temperature)) i = i + 1 return str(line[0]),reg_m(y, x).params.tolist() if __name__ == __main__: if len(sys.argv) != 4: print(Usage: regression.py checkpointDir trainingDataDir streamDataDir, file=sys.stderr) exit(-1) checkpoint, trainingInput, streamInput = sys.argv[1:] sc = SparkContext(local[2], appName=BenchmarkSparkStreaming) trainingLines = sc.textFile(trainingInput) modelRDD = trainingLines.map(lambda line: splitLine(line, |))\ .groupByKey()\ .map(lambda line: train(line))\ .cache() ssc = StreamingContext(sc, 2) ssc.checkpoint(checkpoint) lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, |)) testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD)) testRDD.pprint(20) ssc.start() ssc.awaitTermination() 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to 6 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6 Traceback (most recent call last): File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py, line 90, in dumps return bytearray(self.serializer.dumps((func.func, func.deserializers))) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py, line 427, in dumps return cloudpickle.dumps(obj, 2) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 622, in dumps cp.dump(obj) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 107, in dump return Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 193, in save_function self.save_function_tuple(obj) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 236, in save_function_tuple save((code, closure, base_globals)) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line