I am using pyspark stateful stream (2.0), which receives JSON from Socket. I am getting the following error, When i send more then one records. meaning if i send only one message i am getting response. If i send more than one message getting following error,
def createmd5Hash(po): data = json.loads(po) return(hashlib.md5(data['somevalue'].encode('utf-8')).hexdigest(),data) Implementation 1: stream = ssc.socketTextStream("ip", 3341) ssc.checkpoint('E:\\Work\\Python1\\work\\spark\\checkpoint\\') initialStateRDD = sc.parallelize([(u'na', 1)]) with_hash = stream.map(lambda po : createmd5Hash(po)).reduceByKey(lambda s1,s2:s1) Implementation 2: stream = ssc.textFileStream('C:\\sparkpoc\\input') ssc.checkpoint('E:\\Work\\Python1\\work\\spark\\checkpoint\\') initialStateRDD = sc.parallelize([(u'na', 1)]) with_hash = stream.map(lambda po : createmd5Hash(po)).reduceByKey(lambda s1,s2:s1) To be specific i am getting expected result when i read json from File system textFileStream. But getting follwoing error when i use the socket stream socketTextStream 16/10/06 20:50:42 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "E:\Work\spark\installtion\spark\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main File "E:\Work\spark\installtion\spark\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process File "E:\Work\spark\installtion\spark\python\pyspark\rdd.py", line 2371, in pipeline_func return func(split, prev_func(split, iterator)) File "E:\Work\spark\installtion\spark\python\pyspark\rdd.py", line 2371, in pipeline_func return func(split, prev_func(split, iterator)) File "E:\Work\spark\installtion\spark\python\pyspark\rdd.py", line 317, in func return f(iterator) File "E:\Work\spark\installtion\spark\python\pyspark\rdd.py", line 1792, in combineLocally merger.mergeValues(iterator) File "E:\Work\spark\installtion\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 236, in mergeValues for k, v in iterator: File "E:/Work/Python1/work/spark/streamexample.py", line 159, in <lambda> with_hash = stream.map(lambda po : createmd5Hash(po)).reduceByKey(lambda s1,s2:s1) File "E:/Work/Python1/work/spark/streamexample.py", line 31, in createmd5Hash data = json.loads(input_line) File "C:\Python34\lib\json\__init__.py", line 318, in loads return _default_decoder.decode(s) File "C:\Python34\lib\json\decoder.py", line 343, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "C:\Python34\lib\json\decoder.py", line 361, in raw_decode raise ValueError(errmsg("Expecting value", s, err.value)) from None ValueError: Expecting value: line 1 column 1 (char 0) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Can someone please help me ? http://stackoverflow.com/questions/39897475/spark-stateful-streaming-error -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-stateful-streaming-error-tp27851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org