Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Running this now

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
-Phive -Phive-thriftserver -DskipTests clean package


Waiting for it to complete. There is no progress after initial log messages


//LOGS

$ ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
-Phive -Phive-thriftserver -DskipTests clean package

+++ dirname ./make-distribution.sh

++ cd .

++ pwd

+ SPARK_HOME=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0

+ DISTDIR=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

+ SPARK_TACHYON=false

+ TACHYON_VERSION=0.6.4

+ TACHYON_TGZ=tachyon-0.6.4-bin.tar.gz

+ TACHYON_URL=
https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gz

+ MAKE_TGZ=false

+ NAME=none

+ MVN=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn

+ ((  9  ))

+ case $1 in

+ MAKE_TGZ=true

+ shift

+ ((  8  ))

+ case $1 in

+ break

+ '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
']'

+ '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
']'

++ command -v git

+ '[' /usr/bin/git ']'

++ git rev-parse --short HEAD

++ :

+ GITREV=

+ '[' '!' -z '' ']'

+ unset GITREV

++ command -v /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn

+ '[' '!' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn ']'

++ /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn help:evaluate
-Dexpression=project.version -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
-Phive -Phive-thriftserver -DskipTests clean package

++ grep -v INFO

++ tail -n 1

//LOGS

On Sun, Jun 28, 2015 at 12:17 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz
 file ?

 On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote:

 You can use the following command to build Spark after applying the pull
 request:

 mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package


 Cheers


 On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I see that block support did not make it to spark 1.4 release.

 Can you share instructions of building spark with this support for
 hadoop 2.4.x distribution.

 appreciate.

 On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 This is nice. Which version of Spark has this support ? Or do I need to
 build it.
 I have never built Spark from git, please share instructions for Hadoop
 2.4.x YARN.

 I am struggling a lot to get a join work between 200G and 2TB datasets.
 I am constantly getting this exception

 1000s of executors are failing with

 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to
 get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
 java.io.IOException: Failed to connect to
 executor_host_name/executor_ip_address:60162
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com
 wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with
 Spark. Worst is am not able to figure out the reason of failure,  the 
 logs
 run into millions of lines and i do not know the keywords to search for
 failure reason

 On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We use Scoobi + 

Re: Join highly skewed datasets

2015-06-28 Thread Ted Yu
maven command needs to be passed through --mvn option.

Cheers

On Sun, Jun 28, 2015 at 12:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Running this now

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
 -Phive -Phive-thriftserver -DskipTests clean package


 Waiting for it to complete. There is no progress after initial log messages


 //LOGS

 $ ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
 -Phive -Phive-thriftserver -DskipTests clean package

 +++ dirname ./make-distribution.sh

 ++ cd .

 ++ pwd

 + SPARK_HOME=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0

 + DISTDIR=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + SPARK_TACHYON=false

 + TACHYON_VERSION=0.6.4

 + TACHYON_TGZ=tachyon-0.6.4-bin.tar.gz

 + TACHYON_URL=
 https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gz

 + MAKE_TGZ=false

 + NAME=none

 + MVN=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn

 + ((  9  ))

 + case $1 in

 + MAKE_TGZ=true

 + shift

 + ((  8  ))

 + case $1 in

 + break

 + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
 ']'

 + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
 ']'

 ++ command -v git

 + '[' /usr/bin/git ']'

 ++ git rev-parse --short HEAD

 ++ :

 + GITREV=

 + '[' '!' -z '' ']'

 + unset GITREV

 ++ command -v /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn

 + '[' '!' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn ']'

 ++ /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn help:evaluate
 -Dexpression=project.version -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
 -Phive -Phive-thriftserver -DskipTests clean package

 ++ grep -v INFO

 ++ tail -n 1

 //LOGS

 On Sun, Jun 28, 2015 at 12:17 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz
 file ?

 On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote:

 You can use the following command to build Spark after applying the pull
 request:

 mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package


 Cheers


 On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I see that block support did not make it to spark 1.4 release.

 Can you share instructions of building spark with this support for
 hadoop 2.4.x distribution.

 appreciate.

 On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 This is nice. Which version of Spark has this support ? Or do I need
 to build it.
 I have never built Spark from git, please share instructions for
 Hadoop 2.4.x YARN.

 I am struggling a lot to get a join work between 200G and 2TB
 datasets. I am constantly getting this exception

 1000s of executors are failing with

 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to
 get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
 java.io.IOException: Failed to connect to
 executor_host_name/executor_ip_address:60162
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com
 wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with
 Spark. Worst is am not able to figure out the reason of failure,  the 
 logs
 run into millions of lines and i do not know the keywords to search for
 failure reason

 On 

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
 ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
-Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package


or


 ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
-Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
​Both fail with

+ echo -e 'Specify the Maven command with the --mvn flag'

Specify the Maven command with the --mvn flag

+ exit -1


Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
you need 1) to publish to inhouse maven, so your application can depend on
your version, and 2) use the spark distribution you compiled to launch your
job (assuming you run with yarn so you can launch multiple versions of
spark on same cluster)

On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 How can i import this pre-built spark into my application via maven as i
 want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak




Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I am able to use blockjoin API and it does not throw compilation error

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.blockJoin(viEvents,1,1).map {

}

Here viEvents is highly skewed and both are on HDFS.

What should be the optimal values of replication, i gave 1,1



On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as i
 want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
a blockJoin spreads out one side while replicating the other. i would
suggest replicating the smaller side. so if lstgItem is smaller try 3,1 or
else 1,3. this should spread the fat keys out over multiple (3)
executors...


On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as
 i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




 --
 Deepak




Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
You mentioned storage levels must be
(should be memory-and-disk or disk-only), number of partitions (should be
large, multiple of num executors),

how do i specify that ?

On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as
 i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




 --
 Deepak




-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I ran this w/o maven options

./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
-Phive-thriftserver

I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

I hope this is built with 2.4.x hadoop as i did specify -P

On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
How can i import this pre-built spark into my application via maven as i
want to use the block join API.

On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
-Phive-thriftserver

Build was successful but the script faild. Is there a way to pass the
incremented version ?


[INFO] BUILD SUCCESS

[INFO]


[INFO] Total time: 09:56 min

[INFO] Finished at: 2015-06-28T13:45:29-07:00

[INFO] Final Memory: 84M/902M

[INFO]


+ rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

+ mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

+ echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

+ echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

+ cp
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

+ cp
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

+ cp
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

+ mkdir -p
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

+ cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

+ '[' 1 == 1 ']'

+ cp
'/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

cp:
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
No such file or directory

LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
-Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote:

 you need 1) to publish to inhouse maven, so your application can depend on
 your version, and 2) use the spark distribution you compiled to launch your
 job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as i
 want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread Ted Yu
You can use the following command to build Spark after applying the pull
request:

mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package


Cheers


On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I see that block support did not make it to spark 1.4 release.

 Can you share instructions of building spark with this support for hadoop
 2.4.x distribution.

 appreciate.

 On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 This is nice. Which version of Spark has this support ? Or do I need to
 build it.
 I have never built Spark from git, please share instructions for Hadoop
 2.4.x YARN.

 I am struggling a lot to get a join work between 200G and 2TB datasets. I
 am constantly getting this exception

 1000s of executors are failing with

 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to
 get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
 java.io.IOException: Failed to connect to
 executor_host_name/executor_ip_address:60162
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with Spark.
 Worst is am not able to figure out the reason of failure,  the logs run
 into millions of lines and i do not know the keywords to search for failure
 reason

 On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We use Scoobi + MR to perform joins and we particularly use
 blockJoin() API of scoobi


 /** Perform an equijoin with another distributed list where this list
 is considerably smaller
 * than the right (but too large to fit in memory), and where the keys
 of right may be
 * particularly skewed. */

  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A,
 B))] =
 Relational.blockJoin(left, right)


 I am trying to do a POC and what Spark join API(s) is recommended to
 achieve something similar ?

 Please suggest.

 --
 Deepak





 --
 Deepak





 --
 Deepak




 --
 Deepak




Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz file
?

On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote:

 You can use the following command to build Spark after applying the pull
 request:

 mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package


 Cheers


 On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I see that block support did not make it to spark 1.4 release.

 Can you share instructions of building spark with this support for hadoop
 2.4.x distribution.

 appreciate.

 On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 This is nice. Which version of Spark has this support ? Or do I need to
 build it.
 I have never built Spark from git, please share instructions for Hadoop
 2.4.x YARN.

 I am struggling a lot to get a join work between 200G and 2TB datasets.
 I am constantly getting this exception

 1000s of executors are failing with

 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to
 get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
 java.io.IOException: Failed to connect to
 executor_host_name/executor_ip_address:60162
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com
 wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with Spark.
 Worst is am not able to figure out the reason of failure,  the logs run
 into millions of lines and i do not know the keywords to search for 
 failure
 reason

 On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We use Scoobi + MR to perform joins and we particularly use
 blockJoin() API of scoobi


 /** Perform an equijoin with another distributed list where this
 list is considerably smaller
 * than the right (but too large to fit in memory), and where the
 keys of right may be
 * particularly skewed. */

  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A,
 B))] =
 Relational.blockJoin(left, right)


 I am trying to do a POC and what Spark join API(s) is recommended to
 achieve something similar ?

 Please suggest.

 --
 Deepak





 --
 Deepak





 --
 Deepak




 --
 Deepak





-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Regarding # of executors.

I get 342 executors in parallel each time and i set executor-cores to 1.
Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions
while running blockJoin. Is this correct.

And is my assumptions on replication levels correct.

Did you get a chance to look at my processing.



On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers ko...@tresata.com wrote:

 regarding your calculation of executors... RAM in executor is not really
 comparable to size on disk.

 if you read from from file and write to file you do not have to set
 storage level.

 in the join or blockJoin specify number of partitions  as a multiple (say
 2 times) of number of cores available to you across all executors (so not
 just number of executors, on yarn you can have many cores per executor).

 On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Could you please suggest and help me understand further.

 This is the actual sizes

 -sh-4.1$ hadoop fs -count dw_lstg_item
1  764  2041084436189
 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
 *This is not skewed there is exactly one etntry for each item but its 2TB*
 So should its replication be set to 1 ?

 The below two datasets (RDD) are unioned and their total size is 150G.
 These can be skewed and
 hence we use block join with Scoobi + MR.
 *So should its replication be set to 3 ?*
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
1  10173796345977
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
1  10185559964549
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21

 Also can you suggest the number of executors to be used in this case ,
 each executor is started with max 14G of memory?

 Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors
 ? Is this calculation correct ?

 And also please suggest on the
 (should be memory-and-disk or disk-only), number of partitions (should
 be large, multiple of num executors),


 https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

 When do i choose this setting ?  (Attached is my code for reference)



 On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote:

 a blockJoin spreads out one side while replicating the other. i would
 suggest replicating the smaller side. so if lstgItem is smaller try 3,1
 or else 1,3. this should spread the fat keys out over multiple (3)
 executors...


 On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Could you please suggest and help me understand further.

This is the actual sizes

-sh-4.1$ hadoop fs -count dw_lstg_item
   1  764  2041084436189
/sys/edw/dw_lstg_item/snapshot/2015/06/25/00
*This is not skewed there is exactly one etntry for each item but its 2TB*
So should its replication be set to 1 ?

The below two datasets (RDD) are unioned and their total size is 150G.
These can be skewed and
hence we use block join with Scoobi + MR.
*So should its replication be set to 3 ?*
-sh-4.1$ hadoop fs -count
/apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
   1  10173796345977
/apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
-sh-4.1$ hadoop fs -count
/apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
   1  10185559964549
/apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21

Also can you suggest the number of executors to be used in this case , each
executor is started with max 14G of memory?

Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors ?
Is this calculation correct ?

And also please suggest on the
(should be memory-and-disk or disk-only), number of partitions (should be
large, multiple of num executors),

https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

When do i choose this setting ?  (Attached is my code for reference)



On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote:

 a blockJoin spreads out one side while replicating the other. i would
 suggest replicating the smaller side. so if lstgItem is smaller try 3,1
 or else 1,3. this should spread the fat keys out over multiple (3)
 executors...


 On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com
 wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as
 i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, 

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
specify numPartitions or partitioner for operations that shuffle.

so use:
def join[W](other: RDD[(K, W)], numPartitions: Int)

or
def blockJoin[W](
  other: JavaPairRDD[K, W],
  leftReplication: Int,
  rightReplication: Int,
  partitioner: Partitioner)

for example:
left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions))



On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 You mentioned storage levels must be
 (should be memory-and-disk or disk-only), number of partitions (should be
 large, multiple of num executors),

 how do i specify that ?

 On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com
 wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as
 i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




 --
 Deepak




 --
 Deepak




Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
My code:

val viEvents = details.filter(_.get(14).asInstanceOf[Long] !=
NULL_VALUE).map
{ vi = (vi.get(14).asInstanceOf[Long], vi) } //AVRO (150G)

val lstgItem = DataUtil.getDwLstgItem(sc,
DateUtil.addDaysToDate(startDate, -89)).filter(_.getItemId().toLong !=
NULL_VALUE).map { lstg = (lstg.getItemId().toLong, lstg) } // SEQUENCE
(2TB)


val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
Long))] = viEvents.blockJoin(lstgItem, 3, 1, new HashPartitioner(2141)).map
{

}



On Sun, Jun 28, 2015 at 3:03 PM, Koert Kuipers ko...@tresata.com wrote:

 specify numPartitions or partitioner for operations that shuffle.

 so use:
 def join[W](other: RDD[(K, W)], numPartitions: Int)

 or
 def blockJoin[W](
   other: JavaPairRDD[K, W],
   leftReplication: Int,
   rightReplication: Int,
   partitioner: Partitioner)

 for example:
 left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions))



 On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 You mentioned storage levels must be
 (should be memory-and-disk or disk-only), number of partitions (should be
 large, multiple of num executors),

 how do i specify that ?

 On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com
 wrote:

 you need 1) to publish to inhouse maven, so your application can
 depend on your version, and 2) use the spark distribution you compiled to
 launch your job (assuming you run with yarn so you can launch multiple
 versions of spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven
 as i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




 --
 

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I am unable to run my application or sample application with prebuilt spark
1.4 and wit this custom 1.4. In both cases i get this error

15/06/28 15:30:07 WARN ipc.Client: Exception encountered while connecting
to the server : java.lang.IllegalArgumentException: Server has invalid
Kerberos principal: hadoop/r...@corp.x.com


Please let me know what is the correct way to specify JARS with 1.4. The
below command used to work with 1.3.1


*Command*

*./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
 --num-executors 9973 --driver-memory 14g --driver-java-options
-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps --executor-memory 14g --executor-cores 1 --queue
hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-06-20 endDate=2015-06-21
input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=200G *



On Sun, Jun 28, 2015 at 3:09 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 My code:

 val viEvents = details.filter(_.get(14).asInstanceOf[Long] !=
 NULL_VALUE).map { vi = (vi.get(14).asInstanceOf[Long], vi) } //AVRO
 (150G)

 val lstgItem = DataUtil.getDwLstgItem(sc,
 DateUtil.addDaysToDate(startDate, -89)).filter(_.getItemId().toLong !=
 NULL_VALUE).map { lstg = (lstg.getItemId().toLong, lstg) } // SEQUENCE
 (2TB)


 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = viEvents.blockJoin(lstgItem, 3, 1, new HashPartitioner(2141)).map
 {

 }



 On Sun, Jun 28, 2015 at 3:03 PM, Koert Kuipers ko...@tresata.com wrote:

 specify numPartitions or partitioner for operations that shuffle.

 so use:
 def join[W](other: RDD[(K, W)], numPartitions: Int)

 or
 def blockJoin[W](
   other: JavaPairRDD[K, W],
   leftReplication: Int,
   rightReplication: Int,
   partitioner: Partitioner)

 for example:
 left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions))



 On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 You mentioned storage levels must be
 (should be memory-and-disk or disk-only), number of partitions (should
 be large, multiple of num executors),

 how do i specify that ?

 On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
regarding your calculation of executors... RAM in executor is not really
comparable to size on disk.

if you read from from file and write to file you do not have to set storage
level.

in the join or blockJoin specify number of partitions  as a multiple (say 2
times) of number of cores available to you across all executors (so not
just number of executors, on yarn you can have many cores per executor).

On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Could you please suggest and help me understand further.

 This is the actual sizes

 -sh-4.1$ hadoop fs -count dw_lstg_item
1  764  2041084436189
 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
 *This is not skewed there is exactly one etntry for each item but its 2TB*
 So should its replication be set to 1 ?

 The below two datasets (RDD) are unioned and their total size is 150G.
 These can be skewed and
 hence we use block join with Scoobi + MR.
 *So should its replication be set to 3 ?*
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
1  10173796345977
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
1  10185559964549
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21

 Also can you suggest the number of executors to be used in this case ,
 each executor is started with max 14G of memory?

 Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors ?
 Is this calculation correct ?

 And also please suggest on the
 (should be memory-and-disk or disk-only), number of partitions (should
 be large, multiple of num executors),


 https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

 When do i choose this setting ?  (Attached is my code for reference)



 On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote:

 a blockJoin spreads out one side while replicating the other. i would
 suggest replicating the smaller side. so if lstgItem is smaller try 3,1
 or else 1,3. this should spread the fat keys out over multiple (3)
 executors...


 On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com
 wrote:

 you need 1) to publish to inhouse maven, so your application can
 depend on your version, and 2) use the spark distribution you compiled to
 launch your job (assuming you run with yarn so you can launch multiple
 versions of spark on same cluster)

 On Sun, Jun 28, 2015 at 

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
other people might disagree, but i have had better luck with a model that
looks more like traditional map-red if you use spark for disk-to-disk
computations: more cores per executor (and so less RAM per core/task). so i
would suggest trying --executor-cores 4 and adjust numPartitions
accordingly.

On Sun, Jun 28, 2015 at 6:45 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Regarding # of executors.

 I get 342 executors in parallel each time and i set executor-cores to 1.
 Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions
 while running blockJoin. Is this correct.

 And is my assumptions on replication levels correct.

 Did you get a chance to look at my processing.



 On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers ko...@tresata.com wrote:

 regarding your calculation of executors... RAM in executor is not really
 comparable to size on disk.

 if you read from from file and write to file you do not have to set
 storage level.

 in the join or blockJoin specify number of partitions  as a multiple (say
 2 times) of number of cores available to you across all executors (so not
 just number of executors, on yarn you can have many cores per executor).

 On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Could you please suggest and help me understand further.

 This is the actual sizes

 -sh-4.1$ hadoop fs -count dw_lstg_item
1  764  2041084436189
 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
 *This is not skewed there is exactly one etntry for each item but its
 2TB*
 So should its replication be set to 1 ?

 The below two datasets (RDD) are unioned and their total size is 150G.
 These can be skewed and
 hence we use block join with Scoobi + MR.
 *So should its replication be set to 3 ?*
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
1  10173796345977
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
1  10185559964549
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21

 Also can you suggest the number of executors to be used in this case ,
 each executor is started with max 14G of memory?

 Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors
 ? Is this calculation correct ?

 And also please suggest on the
 (should be memory-and-disk or disk-only), number of partitions (should
 be large, multiple of num executors),


 https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

 When do i choose this setting ?  (Attached is my code for reference)



 On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com
 wrote:

 a blockJoin spreads out one side while replicating the other. i would
 suggest replicating the smaller side. so if lstgItem is smaller try
 3,1 or else 1,3. this should spread the fat keys out over multiple (3)
 executors...


 On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 

Re: Join highly skewed datasets

2015-06-26 Thread ๏̯͡๏
Not far at all. On large data sets everything simply fails with Spark.
Worst is am not able to figure out the reason of failure,  the logs run
into millions of lines and i do not know the keywords to search for failure
reason

On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 We use Scoobi + MR to perform joins and we particularly use blockJoin()
 API of scoobi


 /** Perform an equijoin with another distributed list where this list is
 considerably smaller
 * than the right (but too large to fit in memory), and where the keys of
 right may be
 * particularly skewed. */

  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] =
 Relational.blockJoin(left, right)


 I am trying to do a POC and what Spark join API(s) is recommended to
 achieve something similar ?

 Please suggest.

 --
 Deepak





-- 
Deepak


Re: Join highly skewed datasets

2015-06-26 Thread Koert Kuipers
we went through a similar process, switching from scalding (where
everything just works on large datasets) to spark (where it does not).

spark can be made to work on very large datasets, it just requires a little
more effort. pay attention to your storage levels (should be
memory-and-disk or disk-only), number of partitions (should be large,
multiple of num executors), and avoid groupByKey

also see:
https://github.com/tresata/spark-sorted (for avoiding in memory operations
for certain type of reduce operations)
https://github.com/apache/spark/pull/6883 (for blockjoin)


On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Not far at all. On large data sets everything simply fails with Spark.
 Worst is am not able to figure out the reason of failure,  the logs run
 into millions of lines and i do not know the keywords to search for failure
 reason

 On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We use Scoobi + MR to perform joins and we particularly use blockJoin()
 API of scoobi


 /** Perform an equijoin with another distributed list where this list is
 considerably smaller
 * than the right (but too large to fit in memory), and where the keys of
 right may be
 * particularly skewed. */

  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))]
 =
 Relational.blockJoin(left, right)


 I am trying to do a POC and what Spark join API(s) is recommended to
 achieve something similar ?

 Please suggest.

 --
 Deepak





 --
 Deepak




Re: Join highly skewed datasets

2015-06-26 Thread ๏̯͡๏
This is nice. Which version of Spark has this support ? Or do I need to
build it.
I have never built Spark from git, please share instructions for Hadoop
2.4.x YARN.

I am struggling a lot to get a join work between 200G and 2TB datasets. I
am constantly getting this exception

1000s of executors are failing with

15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to get
block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
java.io.IOException: Failed to connect to
executor_host_name/executor_ip_address:60162
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)




On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with Spark.
 Worst is am not able to figure out the reason of failure,  the logs run
 into millions of lines and i do not know the keywords to search for failure
 reason

 On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We use Scoobi + MR to perform joins and we particularly use blockJoin()
 API of scoobi


 /** Perform an equijoin with another distributed list where this list
 is considerably smaller
 * than the right (but too large to fit in memory), and where the keys
 of right may be
 * particularly skewed. */

  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A,
 B))] =
 Relational.blockJoin(left, right)


 I am trying to do a POC and what Spark join API(s) is recommended to
 achieve something similar ?

 Please suggest.

 --
 Deepak





 --
 Deepak





-- 
Deepak


Re: Join highly skewed datasets

2015-06-15 Thread Night Wolf
How far did you get?

On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 We use Scoobi + MR to perform joins and we particularly use blockJoin()
 API of scoobi


 /** Perform an equijoin with another distributed list where this list is
 considerably smaller
 * than the right (but too large to fit in memory), and where the keys of
 right may be
 * particularly skewed. */

  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] =
 Relational.blockJoin(left, right)


 I am trying to do a POC and what Spark join API(s) is recommended to
 achieve something similar ?

 Please suggest.

 --
 Deepak