[ https://issues.apache.org/jira/browse/SPARK-8509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14595163#comment-14595163 ]
Joseph K. Bradley commented on SPARK-8509: ------------------------------------------ I believe the bug is here, where you'll need to be careful about what is a DStream vs RDD vs local data: {code} testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD)) testRDD.pprint(20) {code} I believe this is a bug in your code, not in Spark, so please post on the user list instead of JIRA. I'll close this for now. > Failed to JOIN in pyspark > ------------------------- > > Key: SPARK-8509 > URL: https://issues.apache.org/jira/browse/SPARK-8509 > Project: Spark > Issue Type: Bug > Reporter: afancy > > Hi, > I am writing pyspark stream program. I have the training data set to compute > the regression model. I want to use the stream data set to test the model. > So, I join with RDD with the StreamRDD, but i got the exception. Following > are my source code, and the exception I got. Any help is appreciated. Thanks > Regards, > Afancy > -------------------- > {code} > from __future__ import print_function > import sys,os,datetime > from pyspark import SparkContext > from pyspark.streaming import StreamingContext > from pyspark.sql.context import SQLContext > from pyspark.resultiterable import ResultIterable > from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD > import numpy as np > import statsmodels.api as sm > def splitLine(line, delimiter='|'): > values = line.split(delimiter) > st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S') > return (values[0],st.hour), values[2:] > def reg_m(y, x): > ones = np.ones(len(x[0])) > X = sm.add_constant(np.column_stack((x[0], ones))) > for ele in x[1:]: > X = sm.add_constant(np.column_stack((ele, X))) > results = sm.OLS(y, X).fit() > return results > def train(line): > y,x = [],[] > y, x = [],[[],[],[],[],[],[]] > reading_tmp,temp_tmp = [],[] > i = 0 > for reading, temperature in line[1]: > if i%4==0 and len(reading_tmp)==4: > y.append(reading_tmp.pop()) > x[0].append(reading_tmp.pop()) > x[1].append(reading_tmp.pop()) > x[2].append(reading_tmp.pop()) > temp = float(temp_tmp[0]) > del temp_tmp[:] > x[3].append(temp-20.0 if temp>20.0 else 0.0) > x[4].append(16.0-temp if temp<16.0 else 0.0) > x[5].append(5.0-temp if temp<5.0 else 0.0) > reading_tmp.append(float(reading)) > temp_tmp.append(float(temperature)) > i = i + 1 > return str(line[0]),reg_m(y, x).params.tolist() > if __name__ == "__main__": > if len(sys.argv) != 4: > print("Usage: regression.py <checkpointDir> <trainingDataDir> > <streamDataDir>", file=sys.stderr) > exit(-1) > checkpoint, trainingInput, streamInput = sys.argv[1:] > sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming") > trainingLines = sc.textFile(trainingInput) > modelRDD = trainingLines.map(lambda line: splitLine(line, "|"))\ > .groupByKey()\ > .map(lambda line: train(line))\ > .cache() > ssc = StreamingContext(sc, 2) > ssc.checkpoint(checkpoint) > lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, > "|")) > testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), > line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD)) > testRDD.pprint(20) > ssc.start() > ssc.awaitTermination() > {code} > ------------------------ > {code} > 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to > 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6 > Traceback (most recent call last): > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", > line 90, in dumps > return bytearray(self.serializer.dumps((func.func, func.deserializers))) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", > line 427, in dumps > return cloudpickle.dumps(obj, 2) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 622, in dumps > cp.dump(obj) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 107, in dump > return Pickler.dump(self, obj) > File "/usr/lib/python2.7/pickle.py", line 224, in dump > self.save(obj) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple > save(element) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 193, in save_function > self.save_function_tuple(obj) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 236, in save_function_tuple > save((code, closure, base_globals)) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple > save(element) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 600, in save_list > self._batch_appends(iter(obj)) > File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends > save(x) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 193, in save_function > self.save_function_tuple(obj) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 236, in save_function_tuple > save((code, closure, base_globals)) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple > save(element) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 600, in save_list > self._batch_appends(iter(obj)) > File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends > save(tmp[0]) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 193, in save_function > self.save_function_tuple(obj) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 241, in save_function_tuple > save(f_globals) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 649, in save_dict > self._batch_setitems(obj.iteritems()) > File "/usr/lib/python2.7/pickle.py", line 686, in _batch_setitems > save(v) > File "/usr/lib/python2.7/pickle.py", line 306, in save > rv = reduce(self.proto) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line > 193, in __getnewargs__ > "It appears that you are attempting to broadcast an RDD or reference an > RDD from an " > Exception: It appears that you are attempting to broadcast an RDD or > reference an RDD from an action or transformation. RDD transformations and > actions can only be invoked by the driver, not inside of other > transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is > invalid because the values transformation and count action cannot be > performed inside of the rdd1.map transformation. For more information, see > SPARK-5063. > 15/06/18 12:25:37 ERROR StreamingContext: Error starting the context, marking > it as stopped > java.io.IOException: java.lang.NullPointerException > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) > at > org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > at > org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) > at > org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > ... 61 more > Traceback (most recent call last): > File "/home/xiuli/PycharmProjects/benchmark/regression.py", line 103, in > <module> > ssc.start() > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py", > line 184, in start > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o42.start. > : java.io.IOException: java.lang.NullPointerException > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) > at > org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > at > org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) > at > org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > ... 61 more > 15/06/18 12:25:37 INFO SparkContext: Invoking stop() from shutdown hook > 15/06/18 12:25:37 INFO SparkUI: Stopped Spark web UI at > http://10.41.27.11:4040 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org