Got the exception when joining RDD with spark streamRDD

2015-06-18 Thread Groupme
Hi,


I am writing pyspark stream program. I have the training data set to
compute the regression model. I want to use the stream data set to
test the model. So, I join with RDD with the StreamRDD, but i got the
exception. Following are my source code, and the exception I got. Any
help is appreciated. Thanks


Regards,

Afancy




from __future__ import print_function

import sys,os,datetime

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.context import SQLContext
from pyspark.resultiterable import ResultIterable
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
import numpy as np
import statsmodels.api as sm


def splitLine(line, delimiter='|'):
values = line.split(delimiter)
st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
return (values[0],st.hour), values[2:]

def reg_m(y, x):
ones = np.ones(len(x[0]))
X = sm.add_constant(np.column_stack((x[0], ones)))
for ele in x[1:]:
X = sm.add_constant(np.column_stack((ele, X)))
results = sm.OLS(y, X).fit()
return results

def train(line):
y,x = [],[]
y, x = [],[[],[],[],[],[],[]]
reading_tmp,temp_tmp = [],[]
i = 0
for reading, temperature in line[1]:
if i%4==0 and len(reading_tmp)==4:
y.append(reading_tmp.pop())
x[0].append(reading_tmp.pop())
x[1].append(reading_tmp.pop())
x[2].append(reading_tmp.pop())
temp = float(temp_tmp[0])
del temp_tmp[:]
x[3].append(temp-20.0 if temp20.0 else 0.0)
x[4].append(16.0-temp if temp16.0 else 0.0)
x[5].append(5.0-temp if temp5.0 else 0.0)
reading_tmp.append(float(reading))
temp_tmp.append(float(temperature))
i = i + 1
return str(line[0]),reg_m(y, x).params.tolist()

if __name__ == __main__:
if len(sys.argv) != 4:
print(Usage: regression.py checkpointDir trainingDataDir
streamDataDir, file=sys.stderr)
exit(-1)

checkpoint, trainingInput, streamInput = sys.argv[1:]
sc = SparkContext(local[2], appName=BenchmarkSparkStreaming)

trainingLines = sc.textFile(trainingInput)
modelRDD = trainingLines.map(lambda line: splitLine(line, |))\
.groupByKey()\
.map(lambda line: train(line))\
.cache()


ssc = StreamingContext(sc, 2)
ssc.checkpoint(checkpoint)
lines = ssc.textFileStream(streamInput).map(lambda line:
splitLine(line, |))


testRDD = lines.groupByKeyAndWindow(4,2).map(lambda
line:(str(line[0]), line[1])).transform(lambda rdd:
rdd.leftOuterJoin(modelRDD))
testRDD.pprint(20)

ssc.start()
ssc.awaitTermination()




15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs
set to 6 ms for
org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
Traceback (most recent call last):
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py,
line 90, in dumps
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py,
line 427, in dumps
return cloudpickle.dumps(obj, 2)
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 622, in dumps
cp.dump(obj)
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 107, in dump
return Pickler.dump(self, obj)
  File /usr/lib/python2.7/pickle.py, line 224, in dump
self.save(obj)
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
save(element)
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 193, in save_function
self.save_function_tuple(obj)
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 236, in save_function_tuple
save((code, closure, base_globals))
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
save(element)
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File /usr/lib/python2.7/pickle.py, line 600, in save_list
self._batch_appends(iter(obj))
  File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends
save(x)
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 193, in save_function

Re: Got the exception when joining RDD with spark streamRDD

2015-06-18 Thread Davies Liu
This seems be a bug, could you file a JIRA for it?

RDD should be serializable for Streaming job.

On Thu, Jun 18, 2015 at 4:25 AM, Groupme grou...@gmail.com wrote:
 Hi,


 I am writing pyspark stream program. I have the training data set to compute
 the regression model. I want to use the stream data set to test the model.
 So, I join with RDD with the StreamRDD, but i got the exception. Following
 are my source code, and the exception I got. Any help is appreciated. Thanks


 Regards,

 Afancy

 


 from __future__ import print_function

 import sys,os,datetime

 from pyspark import SparkContext
 from pyspark.streaming import StreamingContext
 from pyspark.sql.context import SQLContext
 from pyspark.resultiterable import ResultIterable
 from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
 import numpy as np
 import statsmodels.api as sm


 def splitLine(line, delimiter='|'):
 values = line.split(delimiter)
 st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
 return (values[0],st.hour), values[2:]

 def reg_m(y, x):
 ones = np.ones(len(x[0]))
 X = sm.add_constant(np.column_stack((x[0], ones)))
 for ele in x[1:]:
 X = sm.add_constant(np.column_stack((ele, X)))
 results = sm.OLS(y, X).fit()
 return results

 def train(line):
 y,x = [],[]
 y, x = [],[[],[],[],[],[],[]]
 reading_tmp,temp_tmp = [],[]
 i = 0
 for reading, temperature in line[1]:
 if i%4==0 and len(reading_tmp)==4:
 y.append(reading_tmp.pop())
 x[0].append(reading_tmp.pop())
 x[1].append(reading_tmp.pop())
 x[2].append(reading_tmp.pop())
 temp = float(temp_tmp[0])
 del temp_tmp[:]
 x[3].append(temp-20.0 if temp20.0 else 0.0)
 x[4].append(16.0-temp if temp16.0 else 0.0)
 x[5].append(5.0-temp if temp5.0 else 0.0)
 reading_tmp.append(float(reading))
 temp_tmp.append(float(temperature))
 i = i + 1
 return str(line[0]),reg_m(y, x).params.tolist()

 if __name__ == __main__:
 if len(sys.argv) != 4:
 print(Usage: regression.py checkpointDir trainingDataDir
 streamDataDir, file=sys.stderr)
 exit(-1)

 checkpoint, trainingInput, streamInput = sys.argv[1:]
 sc = SparkContext(local[2], appName=BenchmarkSparkStreaming)

 trainingLines = sc.textFile(trainingInput)
 modelRDD = trainingLines.map(lambda line: splitLine(line, |))\
 .groupByKey()\
 .map(lambda line: train(line))\
 .cache()


 ssc = StreamingContext(sc, 2)
 ssc.checkpoint(checkpoint)
 lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line,
 |))


 testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]),
 line[1])).transform(lambda rdd:  rdd.leftOuterJoin(modelRDD))
 testRDD.pprint(20)

 ssc.start()
 ssc.awaitTermination()


 

 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set
 to 6 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
 Traceback (most recent call last):
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py,
 line 90, in dumps
 return bytearray(self.serializer.dumps((func.func, func.deserializers)))
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py,
 line 427, in dumps
 return cloudpickle.dumps(obj, 2)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 622, in dumps
 cp.dump(obj)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 107, in dump
 return Pickler.dump(self, obj)
   File /usr/lib/python2.7/pickle.py, line 224, in dump
 self.save(obj)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
 save(element)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 193, in save_function
 self.save_function_tuple(obj)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 236, in save_function_tuple
 save((code, closure, base_globals))
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
 save(element)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))
   File /usr/lib/python2.7/pickle.py, line