Map Reduce -v- Parallelism
Hi, Is this guy a silly billy for comparing Apache Flink with Apache Spark ? https://www.youtube.com/watch?v=sYlbD_OoHhs Airbus makes more of the sky with Flink - Jesse Anderson & Hassene Ben Salem Does Apache Spark tomcat hadoop spark support distributed as well as map reduce and parallelism ? Ray says Flink is glued together , is there any truth in this ? Now that I am an expert in Machine Learning algorithms ( https://backbutton.co.uk/about.html ) :- If I download, prototype and put in to production the free apache spark do I have to pay any licence fees ? Can I download , prototype , and put into production using the online documents in secret without logging on the mailing list and not tell anyone ? https://backbutton.co.uk/about.html https://backbutton.co.uk - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: When queried through hiveContext, does hive executes these queries using its execution engine (default is map-reduce), or spark just reads the data and performs those queries itself?
To add on what Vikash said above, bit more internals : 1. There are 2 components which work together to achieve Hive + Spark integration a. HiveContext which extends SqlContext adds logic to add hive specific things e.g. loading jars to talk to underlying metastore db, load configs in hive-site.xml b. HiveThriftServer2 which uses native HiveServer2 and add logic for creating sessions, handling operations. 2. Once thrift server is up , authentication , session management is all delegated to Hive classes. Once parsing of query is done and logical plan is created and passed on to create DataFrame. So no mapReduce , spark intelligently uses needed pieces from Hive and use its own execution engine. --Regards, Lalit On Wed, Jun 8, 2016 at 9:59 PM, Vikash Pareek <vikash.par...@infoobjects.com > wrote: > Himanshu, > > Spark doesn't use hive execution engine (Map Reduce) to execute query. > Spark > only reads the meta data from hive meta store db and executes the query > within Spark execution engine. This meta data is used by Spark's own SQL > execution engine (this includes components such as catalyst, tungsten to > optimize queries) to execute query and generate result faster than hive > (Map > Reduce). > > Using HiveContext means connecting to hive meta store db. Thus, HiveContext > can access hive meta data, and hive meta data includes location of data, > serialization and de-serializations, compression codecs, columns, datatypes > etc. thus, Spark have enough information about the hive tables and it's > data > to understand the target data and execute the query over its on execution > engine. > > Overall, Spark replaced the Map Reduce model completely by it's > in-memory(RDD) computation engine. > > - Vikash Pareek > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/When-queried-through-hiveContext-does-hive-executes-these-queries-using-its-execution-engine-default-tp27114p27117.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: When queried through hiveContext, does hive executes these queries using its execution engine (default is map-reduce), or spark just reads the data and performs those queries itself?
Himanshu, Spark doesn't use hive execution engine (Map Reduce) to execute query. Spark only reads the meta data from hive meta store db and executes the query within Spark execution engine. This meta data is used by Spark's own SQL execution engine (this includes components such as catalyst, tungsten to optimize queries) to execute query and generate result faster than hive (Map Reduce). Using HiveContext means connecting to hive meta store db. Thus, HiveContext can access hive meta data, and hive meta data includes location of data, serialization and de-serializations, compression codecs, columns, datatypes etc. thus, Spark have enough information about the hive tables and it's data to understand the target data and execute the query over its on execution engine. Overall, Spark replaced the Map Reduce model completely by it's in-memory(RDD) computation engine. - Vikash Pareek -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/When-queried-through-hiveContext-does-hive-executes-these-queries-using-its-execution-engine-default-tp27114p27117.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
When queried through hiveContext, does hive executes these queries using its execution engine (default is map-reduce), or spark just reads the data and performs those queries itself?
So what happens underneath when we query on a hive table using hiveContext? 1. Does Spark talks to metastore to get the data location on hdfs and read the data from there to perform those queries? 2. Spark passes those queries to hive and hive executes those queries on the table and returns the results to spark? In this case, might hive be using map-reduce to execute the queries? Please clarify this confusion. I have looked into the code seems like spark is just fetching the data from hdfs. Please convince me otherwise. Thanks Best -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/When-queried-through-hiveContext-does-hive-executes-these-queries-using-its-execution-engine-default-tp27114.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DIMSUM among 550k objects on AWS Elastic Map Reduce fails with OOM errors
Hello everyone, I am trying to compute the similarity between 550k objects using the DIMSUM algorithm available in Spark 1.6. The cluster runs on AWS Elastic Map Reduce and consists of 6 r3.2xlarge instances (one master and five cores), having 8 vCPU and 61 GiB of RAM each. My input data is a 3.5GB CSV file hosted on AWS S3, which I use to build a RowMatrix with 550k columns and 550k rows, passing sparse vectors as rows to the RowMatrix constructor. At every attempt I've made so far the application fails during the /mapPartitionWithIndex/ stage of the /RowMatrix.columnSimilarities()/ method (source code at https://github.com/apache/spark/blob/v1.6.0/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L587 <https://github.com/apache/spark/blob/v1.6.0/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L587> ) with YARN containers 1) exiting with /FAILURE/ due to an /OutOfMemory/ exception on Java heap space (thanks to Spark, apparently) or 2) terminated by AM (and increasing /spark.yarn.executor.memoryOverhead/ as suggested doesn't seem to work). I tried and combined different approaches without noticing significant improvements: - setting AWS EMR maximizeResourceAllocation option to true (details at https://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-spark-configure.html <https://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-spark-configure.html> ) - increasing the number of partitions (via /spark.default.parallelism/, up to 8000) - increasing the driver and executor memory (respectively from default ~512M / ~5G to ~50G / ~15G) - increasing YARN memory overhead (from default 10% up to 40% of driver and executor memory, respectively) - setting the DIMSUM threshold to 0.5 and 0.8 to reduce the number of comparisons Anyone has any idea about the possible cause(s) of these errors? Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DIMSUM-among-550k-objects-on-AWS-Elastic-Map-Reduce-fails-with-OOM-errors-tp27038.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Simple Map Reduce taking lot of time
Hi All, I'm running Spark 1.4.1 on a 8 core machine with 16 GB RAM. I've a 500MB CSV file with 10 columns and i'm need of separating it into multiple CSV/Parquet files based on one of the fields in the CSV file. I've loaded the CSV file using spark-csv and applied the below transformations. It takes a lot of time (more than 20-30mins) and sometimes terminates with OOM. Any idea of better ways to do it? Thanks in advance! I start spark-shell using the below options: # Enabled kryo serializer bin/spark-shell --driver-memory 6G --executor-memory 6G --master local[3] --conf spark.kryoserializer.buffer.max=200m --packages com.databricks:spark-csv_2.11:1.1.0 val df = sqlContext.load(com.databricks.spark.csv, Map(header - true, path - file:///file.csv, partitionColumn - date, numPartitions - 4 ) ) df.map(r = (r(2), List(r))).reduceByKey((a,b) = a ++ b) -- Thanks, M. Varadharajan Experience is what you get when you didn't get what you wanted -By Prof. Randy Pausch in The Last Lecture My Journal :- http://varadharajan.in
Re: How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?
You can also look into https://spark.apache.org/docs/latest/tuning.html for performance tuning. Thanks Best Regards On Mon, Jun 15, 2015 at 10:28 PM, Rex X dnsr...@gmail.com wrote: Thanks very much, Akhil. That solved my problem. Best, Rex On Mon, Jun 15, 2015 at 2:16 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Something like this? val huge_data = sc.textFile(/path/to/first.csv).map(x = (x.split(\t)(1), x.split(\t)(0)) val gender_data = sc.textFile(/path/to/second.csv),map(x = (x.split(\t)(0), x)) val joined_data = huge_data.join(gender_data) joined_data.take(1000) Its scala btw, python api should also be similar. Thanks Best Regards On Sat, Jun 13, 2015 at 12:16 AM, Rex X dnsr...@gmail.com wrote: To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 1000 rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this?
Re: How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?
Something like this? val huge_data = sc.textFile(/path/to/first.csv).map(x = (x.split(\t)(1), x.split(\t)(0)) val gender_data = sc.textFile(/path/to/second.csv),map(x = (x.split(\t)(0), x)) val joined_data = huge_data.join(gender_data) joined_data.take(1000) Its scala btw, python api should also be similar. Thanks Best Regards On Sat, Jun 13, 2015 at 12:16 AM, Rex X dnsr...@gmail.com wrote: To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 1000 rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this?
How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?
To be concrete, say we have a folder with thousands of tab-delimited csv files with following attributes format (each csv file is about 10GB): idnameaddresscity... 1Mattadd1LA... 2Willadd2LA... 3Lucyadd3SF... ... And we have a lookup table based on name above namegender MattM LucyF ... Now we are interested to output from top 1000 rows of each csv file into following format: idnamegender 1MattM ... Can we use pyspark to efficiently handle this?
Re: map - reduce only with disk
You shouldn't have to persist the RDD at all, just call flatMap and reduce on it directly. If you try to persist it, that will try to load the original dat into memory, but here you are only scanning through it once. Matei On Jun 2, 2015, at 2:09 AM, Octavian Ganea octavian.ga...@inf.ethz.ch wrote: Thanks, I was actually using reduceByKey, not groupByKey. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! 2015-06-02 4:40 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com: As long as you don't use cache(), these operations will go from disk to disk, and will only use a fixed amount of memory to build some intermediate results. However, note that because you're using groupByKey, that needs the values for each key to all fit in memory at once. In this case, if you're going to reduce right after, you should use reduceByKey, which will be more efficient. Matei On Jun 1, 2015, at 2:21 PM, octavian.ganea octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch wrote: Dear all, Does anyone know how can I force Spark to use only the disk when doing a simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Octavian Ganea Research assistant at ETH Zurich octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch http://da.inf.ethz.ch/people/OctavianGanea/ http://da.inf.ethz.ch/people/OctavianGanea/
Re: map - reduce only with disk
Yup, exactly. All the workers will use local disk in addition to RAM, but maybe one thing you need to configure is the directory to use for that. It should be set trough spark.local.dir. By default it's /tmp, which on some machines is also in RAM, so that could be a problem. You should set it to a folder on a real disk, or even better, a comma-separated list of disks (e.g. /mnt1,/mnt2) if you have multiple disks. Matei On Jun 2, 2015, at 1:03 PM, Octavian Ganea octavian.ga...@inf.ethz.ch wrote: Thanks a lot! So my understanding is that you call persist only if you need to use the rdd at least twice to compute different things. So, if I just need the RDD for a single scan , like in a simple flatMap(..).reduceByKey(..).saveAsTextFile(..) how do I force the slaves to use the hard-disk (in addition to the RAM) when the RAM is full and not to fail like they do now? Thank you! 2015-06-02 21:25 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com: You shouldn't have to persist the RDD at all, just call flatMap and reduce on it directly. If you try to persist it, that will try to load the original dat into memory, but here you are only scanning through it once. Matei On Jun 2, 2015, at 2:09 AM, Octavian Ganea octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch wrote: Thanks, I was actually using reduceByKey, not groupByKey. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! 2015-06-02 4:40 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com: As long as you don't use cache(), these operations will go from disk to disk, and will only use a fixed amount of memory to build some intermediate results. However, note that because you're using groupByKey, that needs the values for each key to all fit in memory at once. In this case, if you're going to reduce right after, you should use reduceByKey, which will be more efficient. Matei On Jun 1, 2015, at 2:21 PM, octavian.ganea octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch wrote: Dear all, Does anyone know how can I force Spark to use only the disk when doing a simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html Sent from the Apache Spark User List mailing list archive at Nabble.com http://nabble.com/. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Octavian Ganea Research assistant at ETH Zurich octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch http://da.inf.ethz.ch/people/OctavianGanea/ http://da.inf.ethz.ch/people/OctavianGanea/ -- Octavian Ganea Research assistant at ETH Zurich octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch http://da.inf.ethz.ch/people/OctavianGanea/ http://da.inf.ethz.ch/people/OctavianGanea/
map - reduce only with disk
Dear all, Does anyone know how can I force Spark to use only the disk when doing a simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
map reduce ?
Hi, I have JavaPairRDDString, ListInteger and as an example what I want to get. user_id cat1 cat2 cat3 cat4 522 0 1 2 0 62 1 0 3 0 661 1 2 0 1 query : the users who have a number (except 0) in cat1 and cat3 column answer: cat2 - 522,611 cat3-522,62 = user 522 How can I get this solution? I think at first, I should have JavaRDDListString of user list who are in that column. Thank you Best, yasemin -- hiç ender hiç
Re: Spark SQL vs map reduce tableInputOutput
Please take a look at https://issues.apache.org/jira/browse/PHOENIX-1815 On Mon, Apr 20, 2015 at 10:11 AM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for reply. Does phoenix using inside Spark will be useful? what is the best way to bring data from Hbase into Spark in terms performance of application? Regards Jeetendra On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote: To my knowledge, Spark SQL currently doesn't provide range scan capability against hbase. Cheers On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote: HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra
Re: Spark SQL vs map reduce tableInputOutput
I think recommended use will be creating a dataframe using hbase as source. Then you can run any SQL on that DF. In 1.2 you can create a base rdd and then apply schema in the same manner On 21 Apr 2015 03:12, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for reply. Does phoenix using inside Spark will be useful? what is the best way to bring data from Hbase into Spark in terms performance of application? Regards Jeetendra On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote: To my knowledge, Spark SQL currently doesn't provide range scan capability against hbase. Cheers On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote: HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra
Spark SQL vs map reduce tableInputOutput
HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra
Re: Spark SQL vs map reduce tableInputOutput
Thanks for reply. Does phoenix using inside Spark will be useful? what is the best way to bring data from Hbase into Spark in terms performance of application? Regards Jeetendra On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote: To my knowledge, Spark SQL currently doesn't provide range scan capability against hbase. Cheers On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote: HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra
Re: Spark SQL vs map reduce tableInputOutput
To my knowledge, Spark SQL currently doesn't provide range scan capability against hbase. Cheers On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote: HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: randomSplit instead of a huge map reduce ?
- Divide and conquer with reduceByKey (like Ashish mentioned, each pair being the key) would work - looks like a mapReduce with combiners problem. I think reduceByKey would use combiners while aggregateByKey wouldn't. - Could we optimize this further by using combineByKey directly ? Cheers k/ On Fri, Feb 20, 2015 at 6:39 PM, Ashish Rangole arang...@gmail.com wrote: Is there a check you can put in place to not create pairs that aren't in your set of 20M pairs? Additionally, once you have your arrays converted to pairs you can do aggregateByKey with each pair being the key. On Feb 20, 2015 1:57 PM, shlomib shl...@summerhq.com wrote: Hi, I am new to Spark and I think I missed something very basic. I have the following use case (I use Java and run Spark locally on my laptop): I have a JavaRDDString[] - The RDD contains around 72,000 arrays of strings (String[]) - Each array contains 80 words (on average). What I want to do is to convert each array into a new array/list of pairs, for example: Input: String[] words = ['a', 'b', 'c'] Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')] and then I want to count the number of times each pair appeared, so my final output should be something like: Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c', 8), (b', 'c', 10)] The problem: Since each array contains around 80 words, it returns around 3,200 pairs, so after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs to reduce which require way too much memory. (I know I have only around *20,000,000* unique pairs!) I already modified my code and used 'mapPartitions' instead of 'map'. It definitely improved the performance, but I still feel I'm doing something completely wrong. I was wondering if this is the right 'Spark way' to solve this kind of problem, or maybe I should do something like splitting my original RDD into smaller parts (by using randomSplit), then iterate over each part, aggregate the results into some result RDD (by using 'union') and move on to the next part. Can anyone please explain me which solution is better? Thank you very much, Shlomi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
randomSplit instead of a huge map reduce ?
Hi, I am new to Spark and I think I missed something very basic. I have the following use case (I use Java and run Spark locally on my laptop): I have a JavaRDDString[] - The RDD contains around 72,000 arrays of strings (String[]) - Each array contains 80 words (on average). What I want to do is to convert each array into a new array/list of pairs, for example: Input: String[] words = ['a', 'b', 'c'] Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')] and then I want to count the number of times each pair appeared, so my final output should be something like: Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c', 8), (b', 'c', 10)] The problem: Since each array contains around 80 words, it returns around 3,200 pairs, so after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs to reduce which require way too much memory. (I know I have only around *20,000,000* unique pairs!) I already modified my code and used 'mapPartitions' instead of 'map'. It definitely improved the performance, but I still feel I'm doing something completely wrong. I was wondering if this is the right 'Spark way' to solve this kind of problem, or maybe I should do something like splitting my original RDD into smaller parts (by using randomSplit), then iterate over each part, aggregate the results into some result RDD (by using 'union') and move on to the next part. Can anyone please explain me which solution is better? Thank you very much, Shlomi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: randomSplit instead of a huge map reduce ?
Is there a check you can put in place to not create pairs that aren't in your set of 20M pairs? Additionally, once you have your arrays converted to pairs you can do aggregateByKey with each pair being the key. On Feb 20, 2015 1:57 PM, shlomib shl...@summerhq.com wrote: Hi, I am new to Spark and I think I missed something very basic. I have the following use case (I use Java and run Spark locally on my laptop): I have a JavaRDDString[] - The RDD contains around 72,000 arrays of strings (String[]) - Each array contains 80 words (on average). What I want to do is to convert each array into a new array/list of pairs, for example: Input: String[] words = ['a', 'b', 'c'] Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')] and then I want to count the number of times each pair appeared, so my final output should be something like: Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c', 8), (b', 'c', 10)] The problem: Since each array contains around 80 words, it returns around 3,200 pairs, so after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs to reduce which require way too much memory. (I know I have only around *20,000,000* unique pairs!) I already modified my code and used 'mapPartitions' instead of 'map'. It definitely improved the performance, but I still feel I'm doing something completely wrong. I was wondering if this is the right 'Spark way' to solve this kind of problem, or maybe I should do something like splitting my original RDD into smaller parts (by using randomSplit), then iterate over each part, aggregate the results into some result RDD (by using 'union') and move on to the next part. Can anyone please explain me which solution is better? Thank you very much, Shlomi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HBase Thrift API Error on map/reduce functions
I get a serialization problem trying to run Python: sc.parallelize(['1','2']).map(lambda id: client.getRow('table', id, None)) cloudpickle.py can't pickle method_descriptor type I add a function to pickle a method descriptor and now it exceeds the recursion limit I print the method name before i pickle it and it is reset from cStringIO.StringO (output) The problem was at line ~830 of cloudpickle, trying to pickle a file And the initial object to pickle was that: (function func at somewhere, None, PairDeserializer(UTF8Deserializer(), UTF8Deserializer()), BatchedSerializer(PickleSerializer(), 0)) And the error is this: File /home/user/inverted-index.py, line 80, in module print sc.wholeTextFiles(data_dir).flatMap(update).take(2)#.groupByKey().map(store).take(2) File /home/user/spark2/python/pyspark/rdd.py, line 1081, in take totalParts = self._jrdd.partitions().size() File /home/user/spark2/python/pyspark/rdd.py, line 2107, in _jrdd pickled_command = ser.dumps(command) File /home/user/spark2/python/pyspark/serializers.py, line 402, in dumps return cloudpickle.dumps(obj, 2) File /home/user/spark2/python/pyspark/cloudpickle.py, line 832, in dumps cp.dump(obj) File /home/user/spark2/python/pyspark/cloudpickle.py, line 147, in dump raise pickle.PicklingError(msg) pickle.PicklingError: Could not pickle object as excessively deep recursion required. Try _fast_serialization=2 or contact PiCloud support Can any developer that works in that stuff tell me if that problem can be fixed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Thrift-API-Error-on-map-reduce-functions-tp21439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can this be done in map-reduce technique (in parallel)
Hi Cheng, Sorry Again. In this method, i see that the values for a - positions.iterator b - positions.iterator always remain the same. I tried to do a b - positions.iterator.next, it throws an error: value filter is not a member of (Double, Double) Is there something I am missing out here? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7033.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can this be done in map-reduce technique (in parallel)
Lakshmi, this is orthogonal to your question, but in case it's useful. It sounds like you're trying to determine the home location of a user, or something similar. If that's the problem statement, the data pattern may suggest a far more computationally efficient approach. For example, first map all (lat,long) pairs into geocells of a desired resolution (e.g., 10m or 100m), then count occurrences of geocells instead. There are simple libraries to map any (lat,long) pairs into a geocell (string) ID very efficiently. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Wed, Jun 4, 2014 at 3:49 AM, lmk lakshmi.muralikrish...@gmail.com wrote: Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can this be done in map-reduce technique (in parallel)
Hi Cheng, Thanks a lot. That solved my problem. Thanks again for the quick response and solution. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7047.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Can this be done in map-reduce technique (in parallel)
Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can this be done in map-reduce technique (in parallel)
It is possible if you use a cartesian product to produce all possible pairs for each IP address and 2 stages of map-reduce: - first by pairs of points to find the total of each pair and - second by IP address to find the pair for each IP address with the maximum count. Oleg On 4 June 2014 11:49, lmk lakshmi.muralikrish...@gmail.com wrote: Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg
Re: Can this be done in map-reduce technique (in parallel)
When you group by IP address in step 1 to this: (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) How many lat/lon locations do you expect for each IP address? avg and max are interesting. Andrew On Wed, Jun 4, 2014 at 5:29 AM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: It is possible if you use a cartesian product to produce all possible pairs for each IP address and 2 stages of map-reduce: - first by pairs of points to find the total of each pair and - second by IP address to find the pair for each IP address with the maximum count. Oleg On 4 June 2014 11:49, lmk lakshmi.muralikrish...@gmail.com wrote: Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg
Re: Can this be done in map-reduce technique (in parallel)
Hi Oleg/Andrew, Thanks much for the prompt response. We expect thousands of lat/lon pairs for each IP address. And that is my concern with the Cartesian product approach. Currently for a small sample of this data (5000 rows) I am grouping by IP address and then computing the distance between lat/lon coordinates using array manipulation techniques. But I understand this approach is not right when the data volume goes up. My code is as follows: val dataset:RDD[String] = sc.textFile(x.csv) val data = dataset.map(l=l.split(,)) val grpData = data.map(r = (r(3),((r(1).toDouble),r(2).toDouble))).groupByKey() Now, I have the data grouped by ipaddress as Array[(String, Iterable[(Double, Double)])] ex.. Array((ip1,ArrayBuffer((lat1,lon1), (lat2,lon2), (lat3,lon3))) Now I have to find the distance between (lat1,lon1) and (lat2,lon2) and then between (lat1,lon1) and (lat3,lon3) and so on for all combinations. This is where I get stuck. Please guide me on this. Thanks Again. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7016.html Sent from the Apache Spark User List mailing list archive at Nabble.com.