Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
Stacktrace would be helpful if you can provide that. On Mon, Oct 19, 2015 at 1:42 PM, fahad shahwrote: > Hi > > I am trying to do pair rdd's, group by the key assign id based on key. > I am using Pyspark with spark 1.3, for some reason, I am getting this > error that I am unable to figure out - any help much appreciated. > > Things I tried (but to no effect), > > 1. make sure I am not doing any conversions on the strings > 2. make sure that the fields used in the key are all there and not > empty string (or else I toss the row out) > > My code is along following lines (split is using stringio to parse > csv, header removes the header row and parse_train is putting the 54 > fields into named tuple after whitespace/quote removal): > > #Error for string argument is thrown on the BB.take(1) where the > groupbykey is evaluated > > A = sc.textFile("train.csv").filter(lambda x:not > isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is > None) > > A.count() > > B = A.map(lambda k: > > ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, > k.srch_children_count,k.srch_room_count), > (k[0:54]))) > BB = B.groupByKey() > BB.take(1) > > > best fahad > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
Thanks Jeff, please find the stack trace below: Py4JJavaError Traceback (most recent call last) in () 1 BB = B.groupByKey() > 2 BB.take(1) C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\rdd.py in take(self, num) 1222 1223 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1224 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1225 1226 items += res C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 840 mappedRDD = rdd.mapPartitions(partitionFunc) 841 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, --> 842 allowLocal) 843 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 844 C:\apps\dist\IPython-3.1.0.0.0.0.0-0001\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: C:\apps\dist\IPython-3.1.0.0.0.0.0-0001\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 19.0 failed 4 times, most recent failure: Lost task 4.3 in stage 19.0 (TID 95, workernode15.expediademocluster.j1.internal.cloudapp.net): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\worker.py", line 101, in main process() File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\worker.py", line 96, in process serializer.dump_stream(func(split_index, iterator), outfile) File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\serializers.py", line 125, in dump_stream for obj in iterator: File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\rdd.py", line 1626, in add_shuffle_key for k, v in iterator: File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\shuffle.py", line 383, in _external_items False) File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\shuffle.py", line 288, in mergeCombiners for k, v in iterator: File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\serializers.py", line 131, in load_stream yield self._read_with_length(stream) File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\serializers.py", line 148, in _read_with_length length = read_int(stream) File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\serializers.py", line 529, in read_int return struct.unpack("!i", length)[0] error: unpack requires a string argument of length 4 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:311) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at
Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
Thanks Davies, sure, I can share the code/data in pm - best fahad On Mon, Oct 19, 2015 at 10:52 AM, Davies Liuwrote: > Could you simplify the code a little bit so we can reproduce the failure? > (may also have some sample dataset if it depends on them) > > On Sun, Oct 18, 2015 at 10:42 PM, fahad shah wrote: >> Hi >> >> I am trying to do pair rdd's, group by the key assign id based on key. >> I am using Pyspark with spark 1.3, for some reason, I am getting this >> error that I am unable to figure out - any help much appreciated. >> >> Things I tried (but to no effect), >> >> 1. make sure I am not doing any conversions on the strings >> 2. make sure that the fields used in the key are all there and not >> empty string (or else I toss the row out) >> >> My code is along following lines (split is using stringio to parse >> csv, header removes the header row and parse_train is putting the 54 >> fields into named tuple after whitespace/quote removal): >> >> #Error for string argument is thrown on the BB.take(1) where the >> groupbykey is evaluated >> >> A = sc.textFile("train.csv").filter(lambda x:not >> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is >> None) >> >> A.count() >> >> B = A.map(lambda k: >> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, >> k.srch_children_count,k.srch_room_count), >> (k[0:54]))) >> BB = B.groupByKey() >> BB.take(1) >> >> >> best fahad >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
Could you simplify the code a little bit so we can reproduce the failure? (may also have some sample dataset if it depends on them) On Sun, Oct 18, 2015 at 10:42 PM, fahad shahwrote: > Hi > > I am trying to do pair rdd's, group by the key assign id based on key. > I am using Pyspark with spark 1.3, for some reason, I am getting this > error that I am unable to figure out - any help much appreciated. > > Things I tried (but to no effect), > > 1. make sure I am not doing any conversions on the strings > 2. make sure that the fields used in the key are all there and not > empty string (or else I toss the row out) > > My code is along following lines (split is using stringio to parse > csv, header removes the header row and parse_train is putting the 54 > fields into named tuple after whitespace/quote removal): > > #Error for string argument is thrown on the BB.take(1) where the > groupbykey is evaluated > > A = sc.textFile("train.csv").filter(lambda x:not > isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is > None) > > A.count() > > B = A.map(lambda k: > ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, > k.srch_children_count,k.srch_room_count), (k[0:54]))) > BB = B.groupByKey() > BB.take(1) > > > best fahad > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark groupbykey throwing error: unpack requires a string argument of length 4
Hi I am trying to do pair rdd's, group by the key assign id based on key. I am using Pyspark with spark 1.3, for some reason, I am getting this error that I am unable to figure out - any help much appreciated. Things I tried (but to no effect), 1. make sure I am not doing any conversions on the strings 2. make sure that the fields used in the key are all there and not empty string (or else I toss the row out) My code is along following lines (split is using stringio to parse csv, header removes the header row and parse_train is putting the 54 fields into named tuple after whitespace/quote removal): #Error for string argument is thrown on the BB.take(1) where the groupbykey is evaluated A = sc.textFile("train.csv").filter(lambda x:not isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is None) A.count() B = A.map(lambda k: ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, k.srch_children_count,k.srch_room_count), (k[0:54]))) BB = B.groupByKey() BB.take(1) best fahad - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org