[ 
https://issues.apache.org/jira/browse/SPARK-8509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14595163#comment-14595163
 ] 

Joseph K. Bradley commented on SPARK-8509:
------------------------------------------

I believe the bug is here, where you'll need to be careful about what is a 
DStream vs RDD vs local data:
{code}
testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), 
line[1])).transform(lambda rdd:  rdd.leftOuterJoin(modelRDD))
testRDD.pprint(20)
{code}

I believe this is a bug in your code, not in Spark, so please post on the user 
list instead of JIRA.  I'll close this for now.

> Failed to JOIN in pyspark
> -------------------------
>
>                 Key: SPARK-8509
>                 URL: https://issues.apache.org/jira/browse/SPARK-8509
>             Project: Spark
>          Issue Type: Bug
>            Reporter: afancy
>
> Hi,
> I am writing pyspark stream program. I have the training data set to compute 
> the regression model. I want to use the stream data set to test the model. 
> So, I join with RDD with the StreamRDD, but i got the exception. Following 
> are my source code, and the exception I got. Any help is appreciated. Thanks
> Regards,
> Afancy
> --------------------
> {code}
> from __future__ import print_function
> import sys,os,datetime
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.sql.context import SQLContext
> from pyspark.resultiterable import ResultIterable
> from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
> import numpy as np
> import statsmodels.api as sm
> def splitLine(line, delimiter='|'):
>     values = line.split(delimiter)
>     st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
>     return (values[0],st.hour), values[2:]
> def reg_m(y, x):
>     ones = np.ones(len(x[0]))
>     X = sm.add_constant(np.column_stack((x[0], ones)))
>     for ele in x[1:]:
>         X = sm.add_constant(np.column_stack((ele, X)))
>     results = sm.OLS(y, X).fit()
>     return results
> def train(line):
>     y,x = [],[]
>     y, x = [],[[],[],[],[],[],[]]
>     reading_tmp,temp_tmp = [],[]
>     i = 0
>     for reading, temperature in line[1]:
>         if i%4==0 and len(reading_tmp)==4:
>             y.append(reading_tmp.pop())
>             x[0].append(reading_tmp.pop())
>             x[1].append(reading_tmp.pop())
>             x[2].append(reading_tmp.pop())
>             temp = float(temp_tmp[0])
>             del temp_tmp[:]
>             x[3].append(temp-20.0 if temp>20.0 else 0.0)
>             x[4].append(16.0-temp if temp<16.0 else 0.0)
>             x[5].append(5.0-temp if temp<5.0 else 0.0)
>         reading_tmp.append(float(reading))
>         temp_tmp.append(float(temperature))
>         i = i + 1
>     return str(line[0]),reg_m(y, x).params.tolist()
> if __name__ == "__main__":
>     if len(sys.argv) != 4:
>         print("Usage: regression.py <checkpointDir> <trainingDataDir> 
> <streamDataDir>", file=sys.stderr)
>         exit(-1)
>     checkpoint, trainingInput, streamInput = sys.argv[1:]
>     sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming")
>     trainingLines = sc.textFile(trainingInput)
>     modelRDD = trainingLines.map(lambda line: splitLine(line, "|"))\
>                                 .groupByKey()\
>                                 .map(lambda line: train(line))\
>                                 .cache()
>     ssc = StreamingContext(sc, 2)
>     ssc.checkpoint(checkpoint)
>     lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, 
> "|"))
>     testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), 
> line[1])).transform(lambda rdd:  rdd.leftOuterJoin(modelRDD))
>     testRDD.pprint(20)
>     ssc.start()
>     ssc.awaitTermination()
> {code}
> ------------------------
> {code}
> 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to 
> 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
> Traceback (most recent call last):
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py",
>  line 90, in dumps
>     return bytearray(self.serializer.dumps((func.func, func.deserializers)))
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 427, in dumps
>     return cloudpickle.dumps(obj, 2)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 622, in dumps
>     cp.dump(obj)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 107, in dump
>     return Pickler.dump(self, obj)
>   File "/usr/lib/python2.7/pickle.py", line 224, in dump
>     self.save(obj)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
>     save(element)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 193, in save_function
>     self.save_function_tuple(obj)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 236, in save_function_tuple
>     save((code, closure, base_globals))
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
>     save(element)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>     self._batch_appends(iter(obj))
>   File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
>     save(x)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 193, in save_function
>     self.save_function_tuple(obj)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 236, in save_function_tuple
>     save((code, closure, base_globals))
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
>     save(element)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>     self._batch_appends(iter(obj))
>   File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
>     save(tmp[0])
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 193, in save_function
>     self.save_function_tuple(obj)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 241, in save_function_tuple
>     save(f_globals)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
>     self._batch_setitems(obj.iteritems())
>   File "/usr/lib/python2.7/pickle.py", line 686, in _batch_setitems
>     save(v)
>   File "/usr/lib/python2.7/pickle.py", line 306, in save
>     rv = reduce(self.proto)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 
> 193, in __getnewargs__
>     "It appears that you are attempting to broadcast an RDD or reference an 
> RDD from an "
> Exception: It appears that you are attempting to broadcast an RDD or 
> reference an RDD from an action or transformation. RDD transformations and 
> actions can only be invoked by the driver, not inside of other 
> transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is 
> invalid because the values transformation and count action cannot be 
> performed inside of the rdd1.map transformation. For more information, see 
> SPARK-5063.
> 15/06/18 12:25:37 ERROR StreamingContext: Error starting the context, marking 
> it as stopped
> java.io.IOException: java.lang.NullPointerException
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
>         at 
> org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
>         at 
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>         at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
>         at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
>         at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>         at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>         at 
> org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>         at py4j.Gateway.invoke(Gateway.java:259)
>         at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:207)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>         at 
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
>         ... 61 more
> Traceback (most recent call last):
>   File "/home/xiuli/PycharmProjects/benchmark/regression.py", line 103, in 
> <module>
>     ssc.start()
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py",
>  line 184, in start
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>  line 538, in __call__
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o42.start.
> : java.io.IOException: java.lang.NullPointerException
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
>         at 
> org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
>         at 
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>         at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
>         at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
>         at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>         at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>         at 
> org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>         at py4j.Gateway.invoke(Gateway.java:259)
>         at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:207)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>         at 
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
>         ... 61 more
> 15/06/18 12:25:37 INFO SparkContext: Invoking stop() from shutdown hook
> 15/06/18 12:25:37 INFO SparkUI: Stopped Spark web UI at 
> http://10.41.27.11:4040
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to