Have you considered using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey in place of the groupBy to achieve better performance ?
Cheers On Sat, Apr 9, 2016 at 2:00 PM, SURAJ SHETH <shet...@gmail.com> wrote: > Hi, > I am using Spark 1.5.2 > > The file contains 900K rows each with twelve fields (tab separated): > The first 11 fields are Strings with a maximum of 20 chars each. The last > field is a comma separated array of floats with 8,192 values. > > It works perfectly if I change the below code for groupBy from > "x[0].split('\t')[1]" to "x[0]". > The reason seems to be due to the limit of the number of values for a > particular key in groupby. In the below code, I am expecting 500 keys with > tens of thousands of values in a few of them. The largest key value > pair(from groupByKey) has 53K values each having a numpy array of 8192 > floats. > In the changed version, i.e. "groupBy(lambda x : x[0]).mapValues(", we get > 900K keys and one value for each of them which works flawlessly. > > Do we have any limit on the amount of data we get for a key in groupBy? > > The total file size is 16 GB. > > The snippet is : > > import hashlib,re, numpy as np > > def getRows(z): > return np.asfortranarray([float(g) for g in z.split(',')]) > > text1 = sc.textFile('/textFile.txt',480).filter(lambda x : len(x)>1000)\ > .map(lambda x : x.rsplit('\t',1)).map(lambda x : > [x[0],getRows(x[1])]).cache()\ > .groupBy(lambda x : x[0].split('\t')[1]).mapValues(lambda x : > list(x)).cache() > > text1.count() > > Thanks and Regards, > Suraj Sheth > > On Sun, Apr 10, 2016 at 1:19 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> The value was out of the range of integer. >> >> Which Spark release are you using ? >> >> Can you post snippet of code which can reproduce the error ? >> >> Thanks >> >> On Sat, Apr 9, 2016 at 12:25 PM, SURAJ SHETH <shet...@gmail.com> wrote: >> >>> I am trying to perform some processing and cache and count the RDD. >>> Any solutions? >>> >>> Seeing a weird error : >>> >>> File >>> "/mnt/yarn/usercache/hadoop/appcache/application_1456909219314_0014/container_1456909219314_0014_01_000004/pyspark.zip/pyspark/serializers.py", >>> line 550, in write_int >>> stream.write(struct.pack("!i", value)) >>> error: 'i' format requires -2147483648 <= number <= 2147483647 >>> >>> at >>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) >>> at >>> org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) >>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) >>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) >>> >>> >>> Thanks and Regards, >>> >>> Suraj Sheth >>> >>> >> >