Re: GroupByKey results in OOM - Any other alternative
Vivek, If the foldByKey solution doesn't work for you, my team uses RDD.persist(DISK_ONLY) to avoid OOM errors. It's slower, of course, and requires tuning other config parameters. It can also be a problem if you do not have enough disk space, meaning that you have to unpersist at the right points if you are running long flows. For us, even though the disk writes are a performance hit, we prefer the Spark programming model to Hadoop M/R. But we are still working on getting this to work end to end on 100s of GB of data on our 16-node cluster. Suren On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS vivek...@gmail.com wrote: Thanks for the input. I will give foldByKey a shot. The way I am doing is, data is partitioned hourly. So I am computing distinct values hourly. Then I use unionRDD to merge them and compute distinct on the overall data. Is there a way to know which key,value pair is resulting in the OOM ? Is there a way to set parallelism in the map stage so that, each worker will process one key at time. ? I didn't realise countApproxDistinctByKey is using hyperloglogplus. This should be interesting. --Vivek On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote: Grouping by key is always problematic since a key might have a huge number of values. You can do a little better than grouping *all* values and *then* finding distinct values by using foldByKey, putting values into a Set. At least you end up with only distinct values in memory. (You don't need two maps either, right?) If the number of distinct values is still huge for some keys, consider the experimental method countApproxDistinctByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285 This should be much more performant at the cost of some accuracy. On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS vivek...@gmail.com wrote: Hi, For last couple of days I have been trying hard to get around this problem. Please share any insights on solving this problem. Problem : There is a huge list of (key, value) pairs. I want to transform this to (key, distinct values) and then eventually to (key, distinct values count) On small dataset groupByKey().map( x = (x_1, x._2.distinct)) ...map(x = (x_1, x._2.distinct.count)) On large data set I am getting OOM. Is there a way to represent Seq of values from groupByKey as RDD and then perform distinct over it ? Thanks Vivek -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Using custom class as a key for groupByKey() or reduceByKey()
I have a simple Java class as follows, that I want to use as a key while applying groupByKey or reduceByKey functions: private static class FlowId { public String dcxId; public String trxId; public String msgType; public FlowId(String dcxId, String trxId, String msgType) { this.dcxId = dcxId; this.trxId = trxId; this.msgType = msgType; } public boolean equals(Object other) { if (other == this) return true; if (other == null) return false; if (getClass() != other.getClass()) return false; FlowId fid = (FlowId) other; if (this.dcxId.equals(fid.dcxId) this.trxId.equals(fid.trxId) this.msgType.equals(fid.msgType)) { return true; } return false; } } I figured that an equals() method would need to be overridden to ensure comparison of keys, but still entries with the same key are listed separately after applying a groupByKey(), for example. What further modifications are necessary to enable usage of above class as a key. Right now, I have fallen back to using Tuple3String, String, String instead of the FlowId class, but it makes the code unnecessarily verbose. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-custom-class-as-a-key-for-groupByKey-or-reduceByKey-tp7640.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using custom class as a key for groupByKey() or reduceByKey()
In Java at large, you must always implement hashCode() when you implement equals(). This is not specific to Spark. This is to maintain the contract that two equals() instances have the same hash code, and that's not the case for your class now. This causes weird things to happen wherever the hash code contract is depended upon. This probably works fine: @Override public int hashCode() { return dcxId.hashCode() ^ trxId.hashCode() ^ msgType.hashCode(); } On Sun, Jun 15, 2014 at 11:45 AM, Gaurav Jain ja...@student.ethz.ch wrote: I have a simple Java class as follows, that I want to use as a key while applying groupByKey or reduceByKey functions: private static class FlowId { public String dcxId; public String trxId; public String msgType; public FlowId(String dcxId, String trxId, String msgType) { this.dcxId = dcxId; this.trxId = trxId; this.msgType = msgType; } public boolean equals(Object other) { if (other == this) return true; if (other == null) return false; if (getClass() != other.getClass()) return false; FlowId fid = (FlowId) other; if (this.dcxId.equals(fid.dcxId) this.trxId.equals(fid.trxId) this.msgType.equals(fid.msgType)) { return true; } return false; } } I figured that an equals() method would need to be overridden to ensure comparison of keys, but still entries with the same key are listed separately after applying a groupByKey(), for example. What further modifications are necessary to enable usage of above class as a key. Right now, I have fallen back to using Tuple3String, String, String instead of the FlowId class, but it makes the code unnecessarily verbose. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-custom-class-as-a-key-for-groupByKey-or-reduceByKey-tp7640.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: long GC pause during file.cache()
Hi, Wei You may try to set JVM opts in *spark-env.sh* as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center *http://researcher.ibm.com/person/us-wtan* http://researcher.ibm.com/person/us-wtan
Re: long GC pause during file.cache()
SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind the WARNING in the logs you can set spark.executor.extraJavaOpts in your SparkConf obj Best, -- Nan Zhu On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: Hi, Wei You may try to set JVM opts in spark-env.sh (http://spark-env.sh) as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com (mailto:wh.s...@gmail.com) On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com (mailto:w...@us.ibm.com) wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh (http://spark-env.sh) SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: MLLib : Decision Tree not getting built for 5 or more levels(maxDepth=5) and the one built for 3 levels is performing poorly
Hi Suraj, I don't see any logs from mllib. You might need to explicit set the logging to DEBUG for mllib. Adding this line for log4j.properties might fix the problem. log4j.logger.org.apache.spark.mllib.tree=DEBUG Also, please let me know if you can encounter similar problems with the Spark master. -Manish On Sat, Jun 14, 2014 at 3:19 AM, SURAJ SHETH shet...@gmail.com wrote: Hi Manish, Thanks for your reply. I am attaching the logs here(regression, 5 levels). It contains the last 100s of lines. Also, I am attaching the screenshot of Spark UI. The first 4 levels complete in less than 6 seconds, while the 5th level doesn't complete even after several hours. Due to the reason that this is somebody else's data, I can't share it. Can you check the code snippet attached in my first email and see if it needs something to enable it to work for large data and = 5 levels. It is working for 3 levels on the same dataset, but, not for 5 levels. In the mean time, I will try to run it on the latest master and let you know the results. If it runs fine there, then, it can be related to 128 MB limit issue that you mentioned. Thanks and Regards, Suraj Sheth On Sat, Jun 14, 2014 at 12:05 AM, Manish Amde manish...@gmail.com wrote: Hi Suraj, I can't answer 1) without knowing the data. However, the results for 2) are surprising indeed. We have tested with a billion samples for regression tasks so I am perplexed with the behavior. Could you try the latest Spark master to see whether this problem goes away. It has code that limits memory consumption at the master and worker nodes to 128 MB by default which ideally should not be needed given the amount of RAM on your cluster. Also, feel free to send the DEBUG logs. It might give me a better idea of where the algorithm is getting stuck. -Manish On Wed, Jun 11, 2014 at 1:20 PM, SURAJ SHETH shet...@gmail.com wrote: Hi Filipus, The train data is already oversampled. The number of positives I mentioned above is for the test dataset : 12028 (apologies for not making this clear earlier) The train dataset has 61,264 positives out of 689,763 total rows. The number of negatives is 628,499. Oversampling was done for the train dataset to ensure that we have atleast 9-10% of positives in the train part No oversampling is done for the test dataset. So, the only difference that remains is the amount of data used for building a tree. But, I have a few more questions : Have we tried how much data can be used at most to build a single Decision Tree. Since, I have enough RAM to fit all the data into memory(only 1.3 GB of train data and 30x3 GB of RAM), I would expect it to build a single Decision Tree with all the data without any issues. But, for maxDepth = 5, it is not able to. I confirmed that when it keeps running for hours, the amount of free memory available is more than 70%. So, it doesn't seem to be a Memory issue either. Thanks and Regards, Suraj Sheth On Wed, Jun 11, 2014 at 10:19 PM, filipus floe...@gmail.com wrote: well I guess your problem is quite unbalanced and due to the information value as a splitting criterion I guess the algo stops after very view splits work arround is oversampling build many training datasets like take randomly 50% of the positives and from the negativ the same amount or let say the double = 6000 positives and 12000 negatives build a tree this you do many times = many models (agents) and than you make an ensemble model. means vote all the model in a way similar two random forest but at the completely different -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Decision-Tree-not-getting-built-for-5-or-more-levels-maxDepth-5-and-the-one-built-for-3-levelsy-tp7401p7405.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Akka listens to hostname while user may spark-submit with master in IP url
Hi, All In Spark the spark.driver.host is driver hostname in default, thus, akka actor system will listen to a URL like akka.tcp://hostname:port. However, when a user tries to use spark-submit to run application, the user may set --master spark://192.168.1.12:7077. Then, the *AppClient* in *SparkDeploySchedulerBackend* cannot successfully register to the Master, and the console prints: WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I think we need to improve this by making akka recognises both hostname and the corresponding IP. Or at least add lines in Spark document to limit user from using IP. Any comments? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: long GC pause during file.cache()
Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0? On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind the WARNING in the logs you can set spark.executor.extraJavaOpts in your SparkConf obj Best, -- Nan Zhu On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: Hi, Wei You may try to set JVM opts in *spark-env.sh http://spark-env.sh* as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center *http://researcher.ibm.com/person/us-wtan* http://researcher.ibm.com/person/us-wtan -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: long GC pause during file.cache()
Yes, I think in the spark-env.sh.template, it is listed in the comments (didn’t check….) Best, -- Nan Zhu On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote: Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0? On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind the WARNING in the logs you can set spark.executor.extraJavaOpts in your SparkConf obj Best, -- Nan Zhu On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: Hi, Wei You may try to set JVM opts in spark-env.sh (http://spark-env.sh) as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com (mailto:wh.s...@gmail.com) On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com (mailto:w...@us.ibm.com) wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh (http://spark-env.sh) SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v (mailto:suren.hira...@sociocast.com)elos.io (http://elos.io) W: www.velos.io (http://www.velos.io/)
pyspark serializer can't handle functions?
It seems that the default serializer used by pyspark can't serialize a list of functions. I've seen some posts about trying to fix this by using dill to serialize rather than pickle. Does anyone know what the status of that project is, or whether there's another easy workaround? I've pasted a sample error message below. Here, regs is a function defined in another file myfile.py that has been included on all workers via the pyFiles argument to SparkContext: sc = SparkContext(local, myapp,pyFiles=[myfile.py]). File runfile.py, line 45, in __init__ regsRDD = sc.parallelize([regs]*self.n) File /Applications/spark-0.9.1-bin-hadoop2/python/pyspark/context.py, line 223, in parallelize serializer.dump_stream(c, tempFile) File /Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line 182, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line 118, in dump_stream self._write_with_length(obj, stream) File /Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line 128, in _write_with_length serialized = self.dumps(obj) File /Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line 270, in dumps def dumps(self, obj): return cPickle.dumps(obj, 2) cPickle.PicklingError: Can't pickle type 'function': attribute lookup __builtin__.function failed -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: GroupByKey results in OOM - Any other alternative
Depending on your requirements when doing hourly metrics calculating distinct cardinality, a much more scalable method would be to use a hyper log log data structure. a scala impl people have used with spark would be https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Vivek, If the foldByKey solution doesn't work for you, my team uses RDD.persist(DISK_ONLY) to avoid OOM errors. It's slower, of course, and requires tuning other config parameters. It can also be a problem if you do not have enough disk space, meaning that you have to unpersist at the right points if you are running long flows. For us, even though the disk writes are a performance hit, we prefer the Spark programming model to Hadoop M/R. But we are still working on getting this to work end to end on 100s of GB of data on our 16-node cluster. Suren On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS vivek...@gmail.com wrote: Thanks for the input. I will give foldByKey a shot. The way I am doing is, data is partitioned hourly. So I am computing distinct values hourly. Then I use unionRDD to merge them and compute distinct on the overall data. Is there a way to know which key,value pair is resulting in the OOM ? Is there a way to set parallelism in the map stage so that, each worker will process one key at time. ? I didn't realise countApproxDistinctByKey is using hyperloglogplus. This should be interesting. --Vivek On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote: Grouping by key is always problematic since a key might have a huge number of values. You can do a little better than grouping *all* values and *then* finding distinct values by using foldByKey, putting values into a Set. At least you end up with only distinct values in memory. (You don't need two maps either, right?) If the number of distinct values is still huge for some keys, consider the experimental method countApproxDistinctByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285 This should be much more performant at the cost of some accuracy. On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS vivek...@gmail.com wrote: Hi, For last couple of days I have been trying hard to get around this problem. Please share any insights on solving this problem. Problem : There is a huge list of (key, value) pairs. I want to transform this to (key, distinct values) and then eventually to (key, distinct values count) On small dataset groupByKey().map( x = (x_1, x._2.distinct)) ...map(x = (x_1, x._2.distinct.count)) On large data set I am getting OOM. Is there a way to represent Seq of values from groupByKey as RDD and then perform distinct over it ? Thanks Vivek -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: long GC pause during file.cache()
Note also that Java does not work well with very large JVMs due to this exact issue. There are two commonly used workarounds: 1) Spawn multiple (smaller) executors on the same machine. This can be done by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone mode[1]). 2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to be smaller and avoid GC pauses [1] See standalone documentation here: http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Yes, I think in the spark-env.sh.template, it is listed in the comments (didn’t check….) Best, -- Nan Zhu On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote: Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0? On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind the WARNING in the logs you can set spark.executor.extraJavaOpts in your SparkConf obj Best, -- Nan Zhu On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: Hi, Wei You may try to set JVM opts in *spark-env.sh http://spark-env.sh* as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center *http://researcher.ibm.com/person/us-wtan* http://researcher.ibm.com/person/us-wtan -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: GroupByKey results in OOM - Any other alternative
Ian, Yep, HLL is an appropriate mechanism. The countApproxDistinctByKey is a wrapper around the com.clearspring.analytics.stream.cardinality.HyperLogLogPlus. Cheers k/ On Sun, Jun 15, 2014 at 4:50 PM, Ian O'Connell i...@ianoconnell.com wrote: Depending on your requirements when doing hourly metrics calculating distinct cardinality, a much more scalable method would be to use a hyper log log data structure. a scala impl people have used with spark would be https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Vivek, If the foldByKey solution doesn't work for you, my team uses RDD.persist(DISK_ONLY) to avoid OOM errors. It's slower, of course, and requires tuning other config parameters. It can also be a problem if you do not have enough disk space, meaning that you have to unpersist at the right points if you are running long flows. For us, even though the disk writes are a performance hit, we prefer the Spark programming model to Hadoop M/R. But we are still working on getting this to work end to end on 100s of GB of data on our 16-node cluster. Suren On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS vivek...@gmail.com wrote: Thanks for the input. I will give foldByKey a shot. The way I am doing is, data is partitioned hourly. So I am computing distinct values hourly. Then I use unionRDD to merge them and compute distinct on the overall data. Is there a way to know which key,value pair is resulting in the OOM ? Is there a way to set parallelism in the map stage so that, each worker will process one key at time. ? I didn't realise countApproxDistinctByKey is using hyperloglogplus. This should be interesting. --Vivek On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote: Grouping by key is always problematic since a key might have a huge number of values. You can do a little better than grouping *all* values and *then* finding distinct values by using foldByKey, putting values into a Set. At least you end up with only distinct values in memory. (You don't need two maps either, right?) If the number of distinct values is still huge for some keys, consider the experimental method countApproxDistinctByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285 This should be much more performant at the cost of some accuracy. On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS vivek...@gmail.com wrote: Hi, For last couple of days I have been trying hard to get around this problem. Please share any insights on solving this problem. Problem : There is a huge list of (key, value) pairs. I want to transform this to (key, distinct values) and then eventually to (key, distinct values count) On small dataset groupByKey().map( x = (x_1, x._2.distinct)) ...map(x = (x_1, x._2.distinct.count)) On large data set I am getting OOM. Is there a way to represent Seq of values from groupByKey as RDD and then perform distinct over it ? Thanks Vivek -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io