Re: Join highly skewed datasets
Running this now ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Waiting for it to complete. There is no progress after initial log messages //LOGS $ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package +++ dirname ./make-distribution.sh ++ cd . ++ pwd + SPARK_HOME=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0 + DISTDIR=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + SPARK_TACHYON=false + TACHYON_VERSION=0.6.4 + TACHYON_TGZ=tachyon-0.6.4-bin.tar.gz + TACHYON_URL= https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gz + MAKE_TGZ=false + NAME=none + MVN=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn + (( 9 )) + case $1 in + MAKE_TGZ=true + shift + (( 8 )) + case $1 in + break + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/ ']' + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/ ']' ++ command -v git + '[' /usr/bin/git ']' ++ git rev-parse --short HEAD ++ : + GITREV= + '[' '!' -z '' ']' + unset GITREV ++ command -v /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn + '[' '!' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn ']' ++ /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn help:evaluate -Dexpression=project.version -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package ++ grep -v INFO ++ tail -n 1 //LOGS On Sun, Jun 28, 2015 at 12:17 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz file ? On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote: You can use the following command to build Spark after applying the pull request: mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package Cheers On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I see that block support did not make it to spark 1.4 release. Can you share instructions of building spark with this support for hadoop 2.4.x distribution. appreciate. On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: This is nice. Which version of Spark has this support ? Or do I need to build it. I have never built Spark from git, please share instructions for Hadoop 2.4.x YARN. I am struggling a lot to get a join work between 200G and 2TB datasets. I am constantly getting this exception 1000s of executors are failing with 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162 java.io.IOException: Failed to connect to executor_host_name/executor_ip_address:60162 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com wrote: we went through a similar process, switching from scalding (where everything just works on large datasets) to spark (where it does not). spark can be made to work on very large datasets, it just requires a little more effort. pay attention to your storage levels (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), and avoid groupByKey also see: https://github.com/tresata/spark-sorted (for avoiding in memory operations for certain type of reduce operations) https://github.com/apache/spark/pull/6883 (for blockjoin) On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Not far at all. On large data sets everything simply fails with Spark. Worst is am not able to figure out the reason of failure, the logs run into millions of lines and i do not know the keywords to search for failure reason On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com wrote: How far did you get? On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: We use Scoobi +
Re: Join highly skewed datasets
maven command needs to be passed through --mvn option. Cheers On Sun, Jun 28, 2015 at 12:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Running this now ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Waiting for it to complete. There is no progress after initial log messages //LOGS $ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package +++ dirname ./make-distribution.sh ++ cd . ++ pwd + SPARK_HOME=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0 + DISTDIR=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + SPARK_TACHYON=false + TACHYON_VERSION=0.6.4 + TACHYON_TGZ=tachyon-0.6.4-bin.tar.gz + TACHYON_URL= https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gz + MAKE_TGZ=false + NAME=none + MVN=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn + (( 9 )) + case $1 in + MAKE_TGZ=true + shift + (( 8 )) + case $1 in + break + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/ ']' + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/ ']' ++ command -v git + '[' /usr/bin/git ']' ++ git rev-parse --short HEAD ++ : + GITREV= + '[' '!' -z '' ']' + unset GITREV ++ command -v /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn + '[' '!' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn ']' ++ /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn help:evaluate -Dexpression=project.version -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package ++ grep -v INFO ++ tail -n 1 //LOGS On Sun, Jun 28, 2015 at 12:17 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz file ? On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote: You can use the following command to build Spark after applying the pull request: mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package Cheers On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I see that block support did not make it to spark 1.4 release. Can you share instructions of building spark with this support for hadoop 2.4.x distribution. appreciate. On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: This is nice. Which version of Spark has this support ? Or do I need to build it. I have never built Spark from git, please share instructions for Hadoop 2.4.x YARN. I am struggling a lot to get a join work between 200G and 2TB datasets. I am constantly getting this exception 1000s of executors are failing with 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162 java.io.IOException: Failed to connect to executor_host_name/executor_ip_address:60162 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com wrote: we went through a similar process, switching from scalding (where everything just works on large datasets) to spark (where it does not). spark can be made to work on very large datasets, it just requires a little more effort. pay attention to your storage levels (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), and avoid groupByKey also see: https://github.com/tresata/spark-sorted (for avoiding in memory operations for certain type of reduce operations) https://github.com/apache/spark/pull/6883 (for blockjoin) On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Not far at all. On large data sets everything simply fails with Spark. Worst is am not able to figure out the reason of failure, the logs run into millions of lines and i do not know the keywords to search for failure reason On
Re: Join highly skewed datasets
./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1
Re: Join highly skewed datasets
you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak -- Deepak
Re: Join highly skewed datasets
I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote: you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak -- Deepak -- Deepak -- Deepak
Re: Join highly skewed datasets
a blockJoin spreads out one side while replicating the other. i would suggest replicating the smaller side. so if lstgItem is smaller try 3,1 or else 1,3. this should spread the fat keys out over multiple (3) executors... On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote: you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak -- Deepak -- Deepak -- Deepak
Re: Join highly skewed datasets
You mentioned storage levels must be (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), how do i specify that ? On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote: you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak
Re: Join highly skewed datasets
I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak
Re: Join highly skewed datasets
How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak -- Deepak
Re: Join highly skewed datasets
I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote: you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak -- Deepak -- Deepak
Re: Join highly skewed datasets
You can use the following command to build Spark after applying the pull request: mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package Cheers On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I see that block support did not make it to spark 1.4 release. Can you share instructions of building spark with this support for hadoop 2.4.x distribution. appreciate. On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: This is nice. Which version of Spark has this support ? Or do I need to build it. I have never built Spark from git, please share instructions for Hadoop 2.4.x YARN. I am struggling a lot to get a join work between 200G and 2TB datasets. I am constantly getting this exception 1000s of executors are failing with 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162 java.io.IOException: Failed to connect to executor_host_name/executor_ip_address:60162 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com wrote: we went through a similar process, switching from scalding (where everything just works on large datasets) to spark (where it does not). spark can be made to work on very large datasets, it just requires a little more effort. pay attention to your storage levels (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), and avoid groupByKey also see: https://github.com/tresata/spark-sorted (for avoiding in memory operations for certain type of reduce operations) https://github.com/apache/spark/pull/6883 (for blockjoin) On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Not far at all. On large data sets everything simply fails with Spark. Worst is am not able to figure out the reason of failure, the logs run into millions of lines and i do not know the keywords to search for failure reason On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com wrote: How far did you get? On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: We use Scoobi + MR to perform joins and we particularly use blockJoin() API of scoobi /** Perform an equijoin with another distributed list where this list is considerably smaller * than the right (but too large to fit in memory), and where the keys of right may be * particularly skewed. */ def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] = Relational.blockJoin(left, right) I am trying to do a POC and what Spark join API(s) is recommended to achieve something similar ? Please suggest. -- Deepak -- Deepak -- Deepak -- Deepak
Re: Join highly skewed datasets
I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz file ? On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote: You can use the following command to build Spark after applying the pull request: mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package Cheers On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I see that block support did not make it to spark 1.4 release. Can you share instructions of building spark with this support for hadoop 2.4.x distribution. appreciate. On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: This is nice. Which version of Spark has this support ? Or do I need to build it. I have never built Spark from git, please share instructions for Hadoop 2.4.x YARN. I am struggling a lot to get a join work between 200G and 2TB datasets. I am constantly getting this exception 1000s of executors are failing with 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162 java.io.IOException: Failed to connect to executor_host_name/executor_ip_address:60162 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com wrote: we went through a similar process, switching from scalding (where everything just works on large datasets) to spark (where it does not). spark can be made to work on very large datasets, it just requires a little more effort. pay attention to your storage levels (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), and avoid groupByKey also see: https://github.com/tresata/spark-sorted (for avoiding in memory operations for certain type of reduce operations) https://github.com/apache/spark/pull/6883 (for blockjoin) On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Not far at all. On large data sets everything simply fails with Spark. Worst is am not able to figure out the reason of failure, the logs run into millions of lines and i do not know the keywords to search for failure reason On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com wrote: How far did you get? On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: We use Scoobi + MR to perform joins and we particularly use blockJoin() API of scoobi /** Perform an equijoin with another distributed list where this list is considerably smaller * than the right (but too large to fit in memory), and where the keys of right may be * particularly skewed. */ def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] = Relational.blockJoin(left, right) I am trying to do a POC and what Spark join API(s) is recommended to achieve something similar ? Please suggest. -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak
Re: Join highly skewed datasets
Regarding # of executors. I get 342 executors in parallel each time and i set executor-cores to 1. Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions while running blockJoin. Is this correct. And is my assumptions on replication levels correct. Did you get a chance to look at my processing. On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers ko...@tresata.com wrote: regarding your calculation of executors... RAM in executor is not really comparable to size on disk. if you read from from file and write to file you do not have to set storage level. in the join or blockJoin specify number of partitions as a multiple (say 2 times) of number of cores available to you across all executors (so not just number of executors, on yarn you can have many cores per executor). On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Could you please suggest and help me understand further. This is the actual sizes -sh-4.1$ hadoop fs -count dw_lstg_item 1 764 2041084436189 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00 *This is not skewed there is exactly one etntry for each item but its 2TB* So should its replication be set to 1 ? The below two datasets (RDD) are unioned and their total size is 150G. These can be skewed and hence we use block join with Scoobi + MR. *So should its replication be set to 3 ?* -sh-4.1$ hadoop fs -count /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20 1 10173796345977 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20 -sh-4.1$ hadoop fs -count /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21 1 10185559964549 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21 Also can you suggest the number of executors to be used in this case , each executor is started with max 14G of memory? Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors ? Is this calculation correct ? And also please suggest on the (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose When do i choose this setting ? (Attached is my code for reference) On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote: a blockJoin spreads out one side while replicating the other. i would suggest replicating the smaller side. so if lstgItem is smaller try 3,1 or else 1,3. this should spread the fat keys out over multiple (3) executors... On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz
Re: Join highly skewed datasets
Could you please suggest and help me understand further. This is the actual sizes -sh-4.1$ hadoop fs -count dw_lstg_item 1 764 2041084436189 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00 *This is not skewed there is exactly one etntry for each item but its 2TB* So should its replication be set to 1 ? The below two datasets (RDD) are unioned and their total size is 150G. These can be skewed and hence we use block join with Scoobi + MR. *So should its replication be set to 3 ?* -sh-4.1$ hadoop fs -count /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20 1 10173796345977 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20 -sh-4.1$ hadoop fs -count /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21 1 10185559964549 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21 Also can you suggest the number of executors to be used in this case , each executor is started with max 14G of memory? Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors ? Is this calculation correct ? And also please suggest on the (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose When do i choose this setting ? (Attached is my code for reference) On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote: a blockJoin spreads out one side while replicating the other. i would suggest replicating the smaller side. so if lstgItem is smaller try 3,1 or else 1,3. this should spread the fat keys out over multiple (3) executors... On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote: you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM,
Re: Join highly skewed datasets
specify numPartitions or partitioner for operations that shuffle. so use: def join[W](other: RDD[(K, W)], numPartitions: Int) or def blockJoin[W]( other: JavaPairRDD[K, W], leftReplication: Int, rightReplication: Int, partitioner: Partitioner) for example: left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions)) On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: You mentioned storage levels must be (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), how do i specify that ? On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote: you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak
Re: Join highly skewed datasets
My code: val viEvents = details.filter(_.get(14).asInstanceOf[Long] != NULL_VALUE).map { vi = (vi.get(14).asInstanceOf[Long], vi) } //AVRO (150G) val lstgItem = DataUtil.getDwLstgItem(sc, DateUtil.addDaysToDate(startDate, -89)).filter(_.getItemId().toLong != NULL_VALUE).map { lstg = (lstg.getItemId().toLong, lstg) } // SEQUENCE (2TB) val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = viEvents.blockJoin(lstgItem, 3, 1, new HashPartitioner(2141)).map { } On Sun, Jun 28, 2015 at 3:03 PM, Koert Kuipers ko...@tresata.com wrote: specify numPartitions or partitioner for operations that shuffle. so use: def join[W](other: RDD[(K, W)], numPartitions: Int) or def blockJoin[W]( other: JavaPairRDD[K, W], leftReplication: Int, rightReplication: Int, partitioner: Partitioner) for example: left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions)) On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: You mentioned storage levels must be (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), how do i specify that ? On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote: you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How can i import this pre-built spark into my application via maven as i want to use the block join API. On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran this w/o maven options ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory. I hope this is built with 2.4.x hadoop as i did specify -P On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package or ./make-distribution.sh --tgz --*mvn* -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package Both fail with + echo -e 'Specify the Maven command with the --mvn flag' Specify the Maven command with the --mvn flag + exit -1 -- Deepak -- Deepak -- Deepak --
Re: Join highly skewed datasets
I am unable to run my application or sample application with prebuilt spark 1.4 and wit this custom 1.4. In both cases i get this error 15/06/28 15:30:07 WARN ipc.Client: Exception encountered while connecting to the server : java.lang.IllegalArgumentException: Server has invalid Kerberos principal: hadoop/r...@corp.x.com Please let me know what is the correct way to specify JARS with 1.4. The below command used to work with 1.3.1 *Command* *./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 9973 --driver-memory 14g --driver-java-options -XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps --executor-memory 14g --executor-cores 1 --queue hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-06-20 endDate=2015-06-21 input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=200G * On Sun, Jun 28, 2015 at 3:09 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: My code: val viEvents = details.filter(_.get(14).asInstanceOf[Long] != NULL_VALUE).map { vi = (vi.get(14).asInstanceOf[Long], vi) } //AVRO (150G) val lstgItem = DataUtil.getDwLstgItem(sc, DateUtil.addDaysToDate(startDate, -89)).filter(_.getItemId().toLong != NULL_VALUE).map { lstg = (lstg.getItemId().toLong, lstg) } // SEQUENCE (2TB) val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = viEvents.blockJoin(lstgItem, 3, 1, new HashPartitioner(2141)).map { } On Sun, Jun 28, 2015 at 3:03 PM, Koert Kuipers ko...@tresata.com wrote: specify numPartitions or partitioner for operations that shuffle. so use: def join[W](other: RDD[(K, W)], numPartitions: Int) or def blockJoin[W]( other: JavaPairRDD[K, W], leftReplication: Int, rightReplication: Int, partitioner: Partitioner) for example: left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions)) On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: You mentioned storage levels must be (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), how do i specify that ? On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp:
Re: Join highly skewed datasets
regarding your calculation of executors... RAM in executor is not really comparable to size on disk. if you read from from file and write to file you do not have to set storage level. in the join or blockJoin specify number of partitions as a multiple (say 2 times) of number of cores available to you across all executors (so not just number of executors, on yarn you can have many cores per executor). On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Could you please suggest and help me understand further. This is the actual sizes -sh-4.1$ hadoop fs -count dw_lstg_item 1 764 2041084436189 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00 *This is not skewed there is exactly one etntry for each item but its 2TB* So should its replication be set to 1 ? The below two datasets (RDD) are unioned and their total size is 150G. These can be skewed and hence we use block join with Scoobi + MR. *So should its replication be set to 3 ?* -sh-4.1$ hadoop fs -count /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20 1 10173796345977 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20 -sh-4.1$ hadoop fs -count /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21 1 10185559964549 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21 Also can you suggest the number of executors to be used in this case , each executor is started with max 14G of memory? Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors ? Is this calculation correct ? And also please suggest on the (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose When do i choose this setting ? (Attached is my code for reference) On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote: a blockJoin spreads out one side while replicating the other. i would suggest replicating the smaller side. so if lstgItem is smaller try 3,1 or else 1,3. this should spread the fat keys out over multiple (3) executors... On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/ + '[' 1 == 1 ']' + cp '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ cp: /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar: No such file or directory LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote: you need 1) to publish to inhouse maven, so your application can depend on your version, and 2) use the spark distribution you compiled to launch your job (assuming you run with yarn so you can launch multiple versions of spark on same cluster) On Sun, Jun 28, 2015 at
Re: Join highly skewed datasets
other people might disagree, but i have had better luck with a model that looks more like traditional map-red if you use spark for disk-to-disk computations: more cores per executor (and so less RAM per core/task). so i would suggest trying --executor-cores 4 and adjust numPartitions accordingly. On Sun, Jun 28, 2015 at 6:45 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Regarding # of executors. I get 342 executors in parallel each time and i set executor-cores to 1. Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions while running blockJoin. Is this correct. And is my assumptions on replication levels correct. Did you get a chance to look at my processing. On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers ko...@tresata.com wrote: regarding your calculation of executors... RAM in executor is not really comparable to size on disk. if you read from from file and write to file you do not have to set storage level. in the join or blockJoin specify number of partitions as a multiple (say 2 times) of number of cores available to you across all executors (so not just number of executors, on yarn you can have many cores per executor). On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Could you please suggest and help me understand further. This is the actual sizes -sh-4.1$ hadoop fs -count dw_lstg_item 1 764 2041084436189 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00 *This is not skewed there is exactly one etntry for each item but its 2TB* So should its replication be set to 1 ? The below two datasets (RDD) are unioned and their total size is 150G. These can be skewed and hence we use block join with Scoobi + MR. *So should its replication be set to 3 ?* -sh-4.1$ hadoop fs -count /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20 1 10173796345977 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20 -sh-4.1$ hadoop fs -count /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21 1 10185559964549 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21 Also can you suggest the number of executors to be used in this case , each executor is started with max 14G of memory? Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors ? Is this calculation correct ? And also please suggest on the (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose When do i choose this setting ? (Attached is my code for reference) On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote: a blockJoin spreads out one side while replicating the other. i would suggest replicating the smaller side. so if lstgItem is smaller try 3,1 or else 1,3. this should spread the fat keys out over multiple (3) executors... On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am able to use blockjoin API and it does not throw compilation error val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.blockJoin(viEvents,1,1).map { } Here viEvents is highly skewed and both are on HDFS. What should be the optimal values of replication, i gave 1,1 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran ./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -Phive -Phive-thriftserver Build was successful but the script faild. Is there a way to pass the incremented version ? [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:56 min [INFO] Finished at: 2015-06-28T13:45:29-07:00 [INFO] Final Memory: 84M/902M [INFO] + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0' + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + cp /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/ + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
Re: Join highly skewed datasets
Not far at all. On large data sets everything simply fails with Spark. Worst is am not able to figure out the reason of failure, the logs run into millions of lines and i do not know the keywords to search for failure reason On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com wrote: How far did you get? On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: We use Scoobi + MR to perform joins and we particularly use blockJoin() API of scoobi /** Perform an equijoin with another distributed list where this list is considerably smaller * than the right (but too large to fit in memory), and where the keys of right may be * particularly skewed. */ def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] = Relational.blockJoin(left, right) I am trying to do a POC and what Spark join API(s) is recommended to achieve something similar ? Please suggest. -- Deepak -- Deepak
Re: Join highly skewed datasets
we went through a similar process, switching from scalding (where everything just works on large datasets) to spark (where it does not). spark can be made to work on very large datasets, it just requires a little more effort. pay attention to your storage levels (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), and avoid groupByKey also see: https://github.com/tresata/spark-sorted (for avoiding in memory operations for certain type of reduce operations) https://github.com/apache/spark/pull/6883 (for blockjoin) On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Not far at all. On large data sets everything simply fails with Spark. Worst is am not able to figure out the reason of failure, the logs run into millions of lines and i do not know the keywords to search for failure reason On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com wrote: How far did you get? On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: We use Scoobi + MR to perform joins and we particularly use blockJoin() API of scoobi /** Perform an equijoin with another distributed list where this list is considerably smaller * than the right (but too large to fit in memory), and where the keys of right may be * particularly skewed. */ def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] = Relational.blockJoin(left, right) I am trying to do a POC and what Spark join API(s) is recommended to achieve something similar ? Please suggest. -- Deepak -- Deepak
Re: Join highly skewed datasets
This is nice. Which version of Spark has this support ? Or do I need to build it. I have never built Spark from git, please share instructions for Hadoop 2.4.x YARN. I am struggling a lot to get a join work between 200G and 2TB datasets. I am constantly getting this exception 1000s of executors are failing with 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162 java.io.IOException: Failed to connect to executor_host_name/executor_ip_address:60162 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com wrote: we went through a similar process, switching from scalding (where everything just works on large datasets) to spark (where it does not). spark can be made to work on very large datasets, it just requires a little more effort. pay attention to your storage levels (should be memory-and-disk or disk-only), number of partitions (should be large, multiple of num executors), and avoid groupByKey also see: https://github.com/tresata/spark-sorted (for avoiding in memory operations for certain type of reduce operations) https://github.com/apache/spark/pull/6883 (for blockjoin) On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Not far at all. On large data sets everything simply fails with Spark. Worst is am not able to figure out the reason of failure, the logs run into millions of lines and i do not know the keywords to search for failure reason On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com wrote: How far did you get? On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: We use Scoobi + MR to perform joins and we particularly use blockJoin() API of scoobi /** Perform an equijoin with another distributed list where this list is considerably smaller * than the right (but too large to fit in memory), and where the keys of right may be * particularly skewed. */ def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] = Relational.blockJoin(left, right) I am trying to do a POC and what Spark join API(s) is recommended to achieve something similar ? Please suggest. -- Deepak -- Deepak -- Deepak
Re: Join highly skewed datasets
How far did you get? On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: We use Scoobi + MR to perform joins and we particularly use blockJoin() API of scoobi /** Perform an equijoin with another distributed list where this list is considerably smaller * than the right (but too large to fit in memory), and where the keys of right may be * particularly skewed. */ def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] = Relational.blockJoin(left, right) I am trying to do a POC and what Spark join API(s) is recommended to achieve something similar ? Please suggest. -- Deepak