[ 
https://issues.apache.org/jira/browse/SPARK-8509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-8509:
-------------------------------------
    Description: 
Hi,

I am writing pyspark stream program. I have the training data set to compute 
the regression model. I want to use the stream data set to test the model. So, 
I join with RDD with the StreamRDD, but i got the exception. Following are my 
source code, and the exception I got. Any help is appreciated. Thanks

Regards,
Afancy
--------------------

{code}
from __future__ import print_function
import sys,os,datetime

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.context import SQLContext
from pyspark.resultiterable import ResultIterable
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
import numpy as np
import statsmodels.api as sm


def splitLine(line, delimiter='|'):
    values = line.split(delimiter)
    st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
    return (values[0],st.hour), values[2:]

def reg_m(y, x):
    ones = np.ones(len(x[0]))
    X = sm.add_constant(np.column_stack((x[0], ones)))
    for ele in x[1:]:
        X = sm.add_constant(np.column_stack((ele, X)))
    results = sm.OLS(y, X).fit()
    return results

def train(line):
    y,x = [],[]
    y, x = [],[[],[],[],[],[],[]]
    reading_tmp,temp_tmp = [],[]
    i = 0
    for reading, temperature in line[1]:
        if i%4==0 and len(reading_tmp)==4:
            y.append(reading_tmp.pop())
            x[0].append(reading_tmp.pop())
            x[1].append(reading_tmp.pop())
            x[2].append(reading_tmp.pop())
            temp = float(temp_tmp[0])
            del temp_tmp[:]
            x[3].append(temp-20.0 if temp>20.0 else 0.0)
            x[4].append(16.0-temp if temp<16.0 else 0.0)
            x[5].append(5.0-temp if temp<5.0 else 0.0)
        reading_tmp.append(float(reading))
        temp_tmp.append(float(temperature))
        i = i + 1
    return str(line[0]),reg_m(y, x).params.tolist()

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: regression.py <checkpointDir> <trainingDataDir> 
<streamDataDir>", file=sys.stderr)
        exit(-1)

    checkpoint, trainingInput, streamInput = sys.argv[1:]
    sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming")

    trainingLines = sc.textFile(trainingInput)
    modelRDD = trainingLines.map(lambda line: splitLine(line, "|"))\
                                .groupByKey()\
                                .map(lambda line: train(line))\
                                .cache()


    ssc = StreamingContext(sc, 2)
    ssc.checkpoint(checkpoint)
    lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, 
"|"))


    testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), 
line[1])).transform(lambda rdd:  rdd.leftOuterJoin(modelRDD))
    testRDD.pprint(20)

    ssc.start()
    ssc.awaitTermination()

------------------------
15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to 
60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
Traceback (most recent call last):
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py",
 line 90, in dumps
    return bytearray(self.serializer.dumps((func.func, func.deserializers)))
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", 
line 427, in dumps
    return cloudpickle.dumps(obj, 2)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 622, in dumps
    cp.dump(obj)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 107, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 193, in save_function
    self.save_function_tuple(obj)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 236, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 193, in save_function
    self.save_function_tuple(obj)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 236, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 193, in save_function
    self.save_function_tuple(obj)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 241, in save_function_tuple
    save(f_globals)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 686, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", 
line 193, in __getnewargs__
    "It appears that you are attempting to broadcast an RDD or reference an RDD 
from an "
Exception: It appears that you are attempting to broadcast an RDD or reference 
an RDD from an action or transformation. RDD transformations and actions can 
only be invoked by the driver, not inside of other transformations; for 
example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the 
values transformation and count action cannot be performed inside of the 
rdd1.map transformation. For more information, see SPARK-5063.
15/06/18 12:25:37 ERROR StreamingContext: Error starting the context, marking 
it as stopped
java.io.IOException: java.lang.NullPointerException
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
        at 
org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at 
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at 
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
        at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
        at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
        at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        ... 61 more
Traceback (most recent call last):
  File "/home/xiuli/PycharmProjects/benchmark/regression.py", line 103, in 
<module>
    ssc.start()
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py",
 line 184, in start
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
 line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o42.start.
: java.io.IOException: java.lang.NullPointerException
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
        at 
org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at 
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at 
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
        at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
        at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
        at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        ... 61 more

15/06/18 12:25:37 INFO SparkContext: Invoking stop() from shutdown hook
15/06/18 12:25:37 INFO SparkUI: Stopped Spark web UI at http://10.41.27.11:4040
{code}


  was:
Hi,

I am writing pyspark stream program. I have the training data set to compute 
the regression model. I want to use the stream data set to test the model. So, 
I join with RDD with the StreamRDD, but i got the exception. Following are my 
source code, and the exception I got. Any help is appreciated. Thanks

Regards,
Afancy
--------------------

from __future__ import print_function
import sys,os,datetime

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.context import SQLContext
from pyspark.resultiterable import ResultIterable
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
import numpy as np
import statsmodels.api as sm


def splitLine(line, delimiter='|'):
    values = line.split(delimiter)
    st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
    return (values[0],st.hour), values[2:]

def reg_m(y, x):
    ones = np.ones(len(x[0]))
    X = sm.add_constant(np.column_stack((x[0], ones)))
    for ele in x[1:]:
        X = sm.add_constant(np.column_stack((ele, X)))
    results = sm.OLS(y, X).fit()
    return results

def train(line):
    y,x = [],[]
    y, x = [],[[],[],[],[],[],[]]
    reading_tmp,temp_tmp = [],[]
    i = 0
    for reading, temperature in line[1]:
        if i%4==0 and len(reading_tmp)==4:
            y.append(reading_tmp.pop())
            x[0].append(reading_tmp.pop())
            x[1].append(reading_tmp.pop())
            x[2].append(reading_tmp.pop())
            temp = float(temp_tmp[0])
            del temp_tmp[:]
            x[3].append(temp-20.0 if temp>20.0 else 0.0)
            x[4].append(16.0-temp if temp<16.0 else 0.0)
            x[5].append(5.0-temp if temp<5.0 else 0.0)
        reading_tmp.append(float(reading))
        temp_tmp.append(float(temperature))
        i = i + 1
    return str(line[0]),reg_m(y, x).params.tolist()

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: regression.py <checkpointDir> <trainingDataDir> 
<streamDataDir>", file=sys.stderr)
        exit(-1)

    checkpoint, trainingInput, streamInput = sys.argv[1:]
    sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming")

    trainingLines = sc.textFile(trainingInput)
    modelRDD = trainingLines.map(lambda line: splitLine(line, "|"))\
                                .groupByKey()\
                                .map(lambda line: train(line))\
                                .cache()


    ssc = StreamingContext(sc, 2)
    ssc.checkpoint(checkpoint)
    lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, 
"|"))


    testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), 
line[1])).transform(lambda rdd:  rdd.leftOuterJoin(modelRDD))
    testRDD.pprint(20)

    ssc.start()
    ssc.awaitTermination()

------------------------
15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to 
60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
Traceback (most recent call last):
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py",
 line 90, in dumps
    return bytearray(self.serializer.dumps((func.func, func.deserializers)))
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", 
line 427, in dumps
    return cloudpickle.dumps(obj, 2)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 622, in dumps
    cp.dump(obj)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 107, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 193, in save_function
    self.save_function_tuple(obj)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 236, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 193, in save_function
    self.save_function_tuple(obj)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 236, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 193, in save_function
    self.save_function_tuple(obj)
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", 
line 241, in save_function_tuple
    save(f_globals)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 686, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", 
line 193, in __getnewargs__
    "It appears that you are attempting to broadcast an RDD or reference an RDD 
from an "
Exception: It appears that you are attempting to broadcast an RDD or reference 
an RDD from an action or transformation. RDD transformations and actions can 
only be invoked by the driver, not inside of other transformations; for 
example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the 
values transformation and count action cannot be performed inside of the 
rdd1.map transformation. For more information, see SPARK-5063.
15/06/18 12:25:37 ERROR StreamingContext: Error starting the context, marking 
it as stopped
java.io.IOException: java.lang.NullPointerException
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
        at 
org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at 
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at 
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
        at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
        at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
        at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        ... 61 more
Traceback (most recent call last):
  File "/home/xiuli/PycharmProjects/benchmark/regression.py", line 103, in 
<module>
    ssc.start()
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py",
 line 184, in start
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
  File 
"/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
 line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o42.start.
: java.io.IOException: java.lang.NullPointerException
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
        at 
org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at 
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at 
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
        at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
        at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
        at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        ... 61 more

15/06/18 12:25:37 INFO SparkContext: Invoking stop() from shutdown hook
15/06/18 12:25:37 INFO SparkUI: Stopped Spark web UI at http://10.41.27.11:4040



> Failed to JOIN in pyspark
> -------------------------
>
>                 Key: SPARK-8509
>                 URL: https://issues.apache.org/jira/browse/SPARK-8509
>             Project: Spark
>          Issue Type: Bug
>            Reporter: afancy
>
> Hi,
> I am writing pyspark stream program. I have the training data set to compute 
> the regression model. I want to use the stream data set to test the model. 
> So, I join with RDD with the StreamRDD, but i got the exception. Following 
> are my source code, and the exception I got. Any help is appreciated. Thanks
> Regards,
> Afancy
> --------------------
> {code}
> from __future__ import print_function
> import sys,os,datetime
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.sql.context import SQLContext
> from pyspark.resultiterable import ResultIterable
> from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
> import numpy as np
> import statsmodels.api as sm
> def splitLine(line, delimiter='|'):
>     values = line.split(delimiter)
>     st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
>     return (values[0],st.hour), values[2:]
> def reg_m(y, x):
>     ones = np.ones(len(x[0]))
>     X = sm.add_constant(np.column_stack((x[0], ones)))
>     for ele in x[1:]:
>         X = sm.add_constant(np.column_stack((ele, X)))
>     results = sm.OLS(y, X).fit()
>     return results
> def train(line):
>     y,x = [],[]
>     y, x = [],[[],[],[],[],[],[]]
>     reading_tmp,temp_tmp = [],[]
>     i = 0
>     for reading, temperature in line[1]:
>         if i%4==0 and len(reading_tmp)==4:
>             y.append(reading_tmp.pop())
>             x[0].append(reading_tmp.pop())
>             x[1].append(reading_tmp.pop())
>             x[2].append(reading_tmp.pop())
>             temp = float(temp_tmp[0])
>             del temp_tmp[:]
>             x[3].append(temp-20.0 if temp>20.0 else 0.0)
>             x[4].append(16.0-temp if temp<16.0 else 0.0)
>             x[5].append(5.0-temp if temp<5.0 else 0.0)
>         reading_tmp.append(float(reading))
>         temp_tmp.append(float(temperature))
>         i = i + 1
>     return str(line[0]),reg_m(y, x).params.tolist()
> if __name__ == "__main__":
>     if len(sys.argv) != 4:
>         print("Usage: regression.py <checkpointDir> <trainingDataDir> 
> <streamDataDir>", file=sys.stderr)
>         exit(-1)
>     checkpoint, trainingInput, streamInput = sys.argv[1:]
>     sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming")
>     trainingLines = sc.textFile(trainingInput)
>     modelRDD = trainingLines.map(lambda line: splitLine(line, "|"))\
>                                 .groupByKey()\
>                                 .map(lambda line: train(line))\
>                                 .cache()
>     ssc = StreamingContext(sc, 2)
>     ssc.checkpoint(checkpoint)
>     lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, 
> "|"))
>     testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), 
> line[1])).transform(lambda rdd:  rdd.leftOuterJoin(modelRDD))
>     testRDD.pprint(20)
>     ssc.start()
>     ssc.awaitTermination()
> ------------------------
> 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to 
> 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
> Traceback (most recent call last):
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py",
>  line 90, in dumps
>     return bytearray(self.serializer.dumps((func.func, func.deserializers)))
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 427, in dumps
>     return cloudpickle.dumps(obj, 2)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 622, in dumps
>     cp.dump(obj)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 107, in dump
>     return Pickler.dump(self, obj)
>   File "/usr/lib/python2.7/pickle.py", line 224, in dump
>     self.save(obj)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
>     save(element)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 193, in save_function
>     self.save_function_tuple(obj)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 236, in save_function_tuple
>     save((code, closure, base_globals))
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
>     save(element)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>     self._batch_appends(iter(obj))
>   File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
>     save(x)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 193, in save_function
>     self.save_function_tuple(obj)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 236, in save_function_tuple
>     save((code, closure, base_globals))
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
>     save(element)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>     self._batch_appends(iter(obj))
>   File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
>     save(tmp[0])
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 193, in save_function
>     self.save_function_tuple(obj)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>  line 241, in save_function_tuple
>     save(f_globals)
>   File "/usr/lib/python2.7/pickle.py", line 286, in save
>     f(self, obj) # Call unbound method with explicit self
>   File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
>     self._batch_setitems(obj.iteritems())
>   File "/usr/lib/python2.7/pickle.py", line 686, in _batch_setitems
>     save(v)
>   File "/usr/lib/python2.7/pickle.py", line 306, in save
>     rv = reduce(self.proto)
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 
> 193, in __getnewargs__
>     "It appears that you are attempting to broadcast an RDD or reference an 
> RDD from an "
> Exception: It appears that you are attempting to broadcast an RDD or 
> reference an RDD from an action or transformation. RDD transformations and 
> actions can only be invoked by the driver, not inside of other 
> transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is 
> invalid because the values transformation and count action cannot be 
> performed inside of the rdd1.map transformation. For more information, see 
> SPARK-5063.
> 15/06/18 12:25:37 ERROR StreamingContext: Error starting the context, marking 
> it as stopped
> java.io.IOException: java.lang.NullPointerException
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
>         at 
> org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
>         at 
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>         at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
>         at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
>         at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>         at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>         at 
> org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>         at py4j.Gateway.invoke(Gateway.java:259)
>         at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:207)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>         at 
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
>         ... 61 more
> Traceback (most recent call last):
>   File "/home/xiuli/PycharmProjects/benchmark/regression.py", line 103, in 
> <module>
>     ssc.start()
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py",
>  line 184, in start
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>  line 538, in __call__
>   File 
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o42.start.
> : java.io.IOException: java.lang.NullPointerException
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
>         at 
> org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
>         at 
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
>         at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>         at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
>         at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
>         at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>         at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>         at 
> org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>         at py4j.Gateway.invoke(Gateway.java:259)
>         at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:207)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>         at 
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
>         ... 61 more
> 15/06/18 12:25:37 INFO SparkContext: Invoking stop() from shutdown hook
> 15/06/18 12:25:37 INFO SparkUI: Stopped Spark web UI at 
> http://10.41.27.11:4040
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to