Re: Spark heap issues

2013-12-06 Thread learner1014 all
Still see a whole lot of following erros
java.lang.OutOfMemoryError: Java heap space
13/12/05 16:04:13 INFO executor.StandaloneExecutorBackend: Got assigned
task 553
13/12/05 16:04:13 INFO executor.Executor: Running task ID 553

Issue seems to be that the process hangs as we are probably performing full
GC cycles...
1536.617: [Full GC 1536.617: [CMS: 707839K-707839K(707840K), 5.0507000
secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
5.0507940 secs] [Times: user=4.94 sys=0.00, real=5.05 secs]
1541.669: [Full GC 1541.669: [CMS: 707840K-707839K(707840K), 4.5483600
secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
4.5484390 secs] [Times: user=4.47 sys=0.00, real=4.55 secs]
1546.218: [Full GC 1546.218: [CMS: 707839K-707839K(707840K), 4.5937460
secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
4.5938460 secs] [Times: user=4.59 sys=0.00, real=4.60 secs]
1550.812: [Full GC 1550.812: [CMS: 707839K-707839K(707840K), 5.3572370
secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
5.3573840 secs] [Times: user=5.26 sys=0.01, real=5.35 secs]
1556.171: [Full GC 1556.171: [CMS: 707840K-694574K(707840K), 4.1462520
secs] 1014528K-860511K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
4.1463350 secs] [Times: user=4.13 sys=0.00, real=4.15 secs]
1560.329: [GC [1 CMS-initial-mark: 694574K(707840K)] 874378K(1014528K),
0.4269160 secs] [Times: user=0.41 sys=0.00, real=0.43 secs]


I tried the following parameters and they do not seem to help
  System.setProperty(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
  System.setProperty(spark.akka.timeout, 30)  //in seconds

  System.setProperty(spark.executor.memory,15g)
  System.setProperty(spark.akka.frameSize, 2000)  //in MB
  System.setProperty(spark.akka.threads,8)

Thanks


On Thu, Dec 5, 2013 at 11:31 PM, purav aggarwal
puravaggarwal...@gmail.comwrote:

 Try allocating some more resources to your application.
 You seem to be using 512Mb for you worker node - (you can verify that from
 the master UI)

 Try putting the following settings into your code and see if it helps -

 System.setProperty(spark.executor.memory,15g)   // Will allocate more
 memory
 System.setProperty(spark.akka.frameSize,2000)
 System.setProperty(spark.akka.threads,16)   // Dependent upon
 number of cores with your worker machine


 On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all learner1...@gmail.comwrote:

 Hi,

 Trying to do a join operation on an RDD, my input is pipe delimited data
 and there are 2 files.
 One file is 24MB and the other file is 285MB.
 Setup being used is the single node (server) setup: SPARK_MEM set to 512m

 Master
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Djava.library.path= -Xms512m -Xmx512m
 org.apache.spark.deploy.master.Master --ip localhost --port 7077
 --webui-port 8080

 Worker
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Djava.library.path= -Xms512m -Xmx512m
 org.apache.spark.deploy.worker.Worker spark://localhost:7077


 App
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend
 akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4


 Here is the code
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel

 object SimpleApp {

   def main (args: Array[String]) {


 System.setProperty(spark.local.dir,/spark-0.8.0-incubating-bin-cdh4/tmp);
   System.setProperty(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   System.setProperty(spark.akka.timeout, 30)  //in seconds

   val dataFile2 = /tmp_data/data1.txt
   

Spark Import Issue

2013-12-06 Thread Garrett Hamers
Hello,

I am new to the spark system, and I am trying to write a simple program to
get myself familiar with how spark works. I am currently having problem
with importing the spark package. I am getting the following compiler
error: package org.apache.spark.api.java does not exist.

I have spark-0.8.0-incubating install. I ran the commands: sbt/sbt compile,
sbt/sbt assembly, and sbt/sbt publish-local without any errors. My sql.java
file is located in the spark-0.8.0-incubating root directory. I tried to
compile the code using “javac sql.java” and “javac -cp
assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating*.jar
sql.java”.

Here is the code for sql.java:

package shark;

import java.io.Serializable;

import java.util.List;

import java.io.*;

import org.apache.spark.api.java.*; //Issue is here

public class sql implements Serializable {

  public static void main( String[] args) {

System.out.println(Hello World”);

  }

}


 What do I need to do in order for java to import the spark code properly?
Any advice would be greatly appreciated.

Thank you,
Garrett Hamers


Cluster not accepting jobs

2013-12-06 Thread Nathan Kronenfeld
Hi, all.

I'm trying to connect to a remote cluster from my machine, using spark
0.7.3.  In conf/spark-env.sh, I've set MASTER, SCALA_HOME, SPARK_MASTER_IP,
and SPARK_MASTER_PORT.

When I try to run a job, it starts, but never gets anywhere, and I keep
getting the following error message:

13/12/06 13:37:20 WARN cluster.ClusterScheduler: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered


I look at the cluster UI in a browser, and it says it has 8 workers
registered, all alive.

What does this error mean?  I assume I'm missing something in the setup -
does anyone know what?

Thanks in advance,
   -Nathan Kronenfeld


Re: Cluster not accepting jobs

2013-12-06 Thread Nathan Kronenfeld
Never mind, I figured it out - apparently it was different DNS resolutions
locally and within the cluster; when I use the IP address instead of the
machine name in MASTER, it all seems to work.


On Fri, Dec 6, 2013 at 1:38 PM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 Hi, all.

 I'm trying to connect to a remote cluster from my machine, using spark
 0.7.3.  In conf/spark-env.sh, I've set MASTER, SCALA_HOME, SPARK_MASTER_IP,
 and SPARK_MASTER_PORT.

 When I try to run a job, it starts, but never gets anywhere, and I keep
 getting the following error message:

 13/12/06 13:37:20 WARN cluster.ClusterScheduler: Initial job has not
 accepted any resources; check your cluster UI to ensure that workers are
 registered


 I look at the cluster UI in a browser, and it says it has 8 workers
 registered, all alive.

 What does this error mean?  I assume I'm missing something in the setup -
 does anyone know what?

 Thanks in advance,
-Nathan Kronenfeld





-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Cluster not accepting jobs

2013-12-06 Thread Matei Zaharia
Yeah, in general, make sure you use exactly the same “cluster URL” string shown 
on the master’s web UI. There’s currently a limitation in Akka where different 
ways of specifying the hostname won’t work.

Matei

On Dec 6, 2013, at 10:54 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com 
wrote:

 Never mind, I figured it out - apparently it was different DNS resolutions 
 locally and within the cluster; when I use the IP address instead of the 
 machine name in MASTER, it all seems to work.
 
 
 On Fri, Dec 6, 2013 at 1:38 PM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:
 Hi, all.
 
 I'm trying to connect to a remote cluster from my machine, using spark 0.7.3. 
  In conf/spark-env.sh, I've set MASTER, SCALA_HOME, SPARK_MASTER_IP, and 
 SPARK_MASTER_PORT.
 
 When I try to run a job, it starts, but never gets anywhere, and I keep 
 getting the following error message:
 
 13/12/06 13:37:20 WARN cluster.ClusterScheduler: Initial job has not accepted 
 any resources; check your cluster UI to ensure that workers are registered
 
 I look at the cluster UI in a browser, and it says it has 8 workers 
 registered, all alive.
 
 What does this error mean?  I assume I'm missing something in the setup - 
 does anyone know what?
 
 Thanks in advance,
-Nathan Kronenfeld
 
 
 
 
 
 -- 
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



Spark 0.8.0 Compiling issues

2013-12-06 Thread Gustavo Enrique Salazar Torres
Hi there:

I've trying to compile using sbt/sbt assembly and mvn clean package (with
memory adjustments as suggested here
http://spark.incubator.apache.org/docs/latest/building-with-maven.html).
Unfortunately, compiling fails for both of them with the following error
(here is with Maven
but with SBT the error happens at the same class):

[INFO] Using incremental compilation
[INFO] 'compiler-interface' not yet compiled for Scala 2.9.3. Compiling...
[INFO]   Compilation completed in 17.554 s
[INFO] Compiling 258 Scala sources and 16 Java sources to
/home/gustavo/tools/spark-0.8.0-incubating/core/target/scala-2.9.3/classes...
[WARNING]
/home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:129:
method cleanupJob in class OutputCommitter is deprecated: see corresponding
Javadoc for more information.
[WARNING] getOutputCommitter().cleanupJob(getJobContext())
[WARNING]  ^
[WARNING]
/home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala:592:
method cleanupJob in class OutputCommitter is deprecated: see corresponding
Javadoc for more information.
[WARNING] jobCommitter.cleanupJob(jobTaskContext)
[WARNING]  ^
[ERROR] File name too long
[WARNING] two warnings found
[ERROR] one error found

Is 0.8.0 ready for production? is 0.7.0 more stable?
I'm running on java 6.

Cheers
Gustavo


Re: Spark 0.8.0 Compiling issues

2013-12-06 Thread Matei Zaharia
Yeah, unfortunately the reason it pops up more in 0.8.0 is because our package 
names got longer! But if you just do the build in /tmp it will work.

On Dec 6, 2013, at 11:35 AM, Josh Rosen rosenvi...@gmail.com wrote:

 This isn't a Spark 0.8.0-specific problem.  I googled for sbt error filen 
 ame too long and found a couple of links that suggest that this error may 
 crop up for Linux users with encrypted filesystems or home directories:
 
 http://stackoverflow.com/questions/8404815/how-do-i-build-a-project-that-uses-sbt-as-its-build-system
 https://github.com/sbt/sbt-assembly/issues/69
 
 Try one of the workarounds from those links, such as storing the build target 
 directory on an unencrypted volume.
 
 
 On Fri, Dec 6, 2013 at 11:25 AM, Gustavo Enrique Salazar Torres 
 gsala...@ime.usp.br wrote:
 Hi there:
 
 I've trying to compile using sbt/sbt assembly and mvn clean package (with 
 memory adjustments as suggested here 
 http://spark.incubator.apache.org/docs/latest/building-with-maven.html).
 Unfortunately, compiling fails for both of them with the following error 
 (here is with Maven
 but with SBT the error happens at the same class):
 
 [INFO] Using incremental compilation
 [INFO] 'compiler-interface' not yet compiled for Scala 2.9.3. Compiling...
 [INFO]   Compilation completed in 17.554 s
 [INFO] Compiling 258 Scala sources and 16 Java sources to 
 /home/gustavo/tools/spark-0.8.0-incubating/core/target/scala-2.9.3/classes...
 [WARNING] 
 /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:129:
  method cleanupJob in class OutputCommitter is deprecated: see corresponding 
 Javadoc for more information.
 [WARNING] getOutputCommitter().cleanupJob(getJobContext())
 [WARNING]  ^
 [WARNING] 
 /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala:592:
  method cleanupJob in class OutputCommitter is deprecated: see corresponding 
 Javadoc for more information.
 [WARNING] jobCommitter.cleanupJob(jobTaskContext)
 [WARNING]  ^
 [ERROR] File name too long
 [WARNING] two warnings found
 [ERROR] one error found
 
 Is 0.8.0 ready for production? is 0.7.0 more stable?
 I'm running on java 6.
 
 Cheers
 Gustavo
 



Re: Pre-build Spark for Windows 8.1

2013-12-06 Thread Matei Zaharia
Hey Andrew, unfortunately I don’t know how easy this is. Maybe future versions 
of Akka have it. We can certainly ask them to do it in general but I imagine 
there are some use cases where they wanted this behavior.

Matei

On Dec 5, 2013, at 2:49 PM, Andrew Ash and...@andrewash.com wrote:

 Speaking of akka and host sensitivity...  How much have you hacked on akka to 
 get it to support all of: myhost.mydomain.int, myhost, and 10.1.1.1?  It's 
 kind of a pain to get the Spark URL to exactly match.  I'm wondering if there 
 are usability gains that could be made here or if we're pretty stuck.
 
 
 On Thu, Dec 5, 2013 at 2:43 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Hi,
 
 When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the URL 
 (uppercase instead of lowercase). Unfortunately Akka is very specific about 
 seeing hostnames written in the same way on each node, or else it thinks the 
 message is for another machine!
 
 Matei
 
 On Dec 5, 2013, at 8:27 AM, Adrian Bonar adrian.bo...@microsoft.com wrote:
 
 The master starts up now as expected but the workers are unable to connect 
 to the master. It looks like the master is refusing the connection messages 
 but I’m not sure why. The first two error lines below are from trying to 
 connect a worker from a separate machine and the last two error lines are 
 from trying to connect a worker on the same machine as the master. I 
 verified that the workers do not show up in the master’s web ui.
  
 MASTER:
 D:\sparkspark-class org.apache.spark.deploy.master.Master
 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
 13/12/05 08:08:34 INFO master.Master: Starting Spark master at 
 spark://ADRIBONA-DEV-1:7077
 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/metrics/master/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/metrics/applications/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/static,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/app/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/app,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{*,null}
 13/12/05 08:08:34 INFO server.AbstractConnector: Started 
 SelectChannelConnector@0.0.0.0:8088
 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at 
 http://ADRIBONA-DEV-1:8088
 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message 
 RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1)
  for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master 
 at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message 
 DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
  for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at 
 akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message 
 RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1)
  for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master 
 atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message 
 DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
  for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at 
 akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
 akka://sparkMaster@ADRIBONA-DEV-1:7077
  
 WORKER:
 D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker 
 spark://adribona-dev-1:7077
 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker 
 ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM
 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark
 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/metrics/json,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/static,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/log,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/logPage,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 

Re: Spark 0.8.0 Compiling issues

2013-12-06 Thread Gustavo Enrique Salazar Torres
Thanks! I will do that.

Cheers
Gustavo


On Fri, Dec 6, 2013 at 5:53 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Yeah, unfortunately the reason it pops up more in 0.8.0 is because our
 package names got longer! But if you just do the build in /tmp it will work.

 On Dec 6, 2013, at 11:35 AM, Josh Rosen rosenvi...@gmail.com wrote:

 This isn't a Spark 0.8.0-specific problem.  I googled for sbt error filen
 ame too long and found a couple of links that suggest that this error may
 crop up for Linux users with encrypted filesystems or home directories:


 http://stackoverflow.com/questions/8404815/how-do-i-build-a-project-that-uses-sbt-as-its-build-system
 https://github.com/sbt/sbt-assembly/issues/69

 Try one of the workarounds from those links, such as storing the build
 target directory on an unencrypted volume.


 On Fri, Dec 6, 2013 at 11:25 AM, Gustavo Enrique Salazar Torres 
 gsala...@ime.usp.br wrote:

 Hi there:

 I've trying to compile using sbt/sbt assembly and mvn clean package (with
 memory adjustments as suggested here
 http://spark.incubator.apache.org/docs/latest/building-with-maven.html).
 Unfortunately, compiling fails for both of them with the following error
 (here is with Maven
 but with SBT the error happens at the same class):

 [INFO] Using incremental compilation
 [INFO] 'compiler-interface' not yet compiled for Scala 2.9.3. Compiling...
 [INFO]   Compilation completed in 17.554 s
 [INFO] Compiling 258 Scala sources and 16 Java sources to
 /home/gustavo/tools/spark-0.8.0-incubating/core/target/scala-2.9.3/classes...
 [WARNING]
 /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:129:
 method cleanupJob in class OutputCommitter is deprecated: see corresponding
 Javadoc for more information.
 [WARNING] getOutputCommitter().cleanupJob(getJobContext())
 [WARNING]  ^
 [WARNING]
 /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala:592:
 method cleanupJob in class OutputCommitter is deprecated: see corresponding
 Javadoc for more information.
 [WARNING] jobCommitter.cleanupJob(jobTaskContext)
 [WARNING]  ^
 [ERROR] File name too long
 [WARNING] two warnings found
 [ERROR] one error found

 Is 0.8.0 ready for production? is 0.7.0 more stable?
 I'm running on java 6.

 Cheers
 Gustavo






Writing an RDD to Hive

2013-12-06 Thread Philip Ogren
I have a simple scenario that I'm struggling to implement.  I would like 
to take a fairly simple RDD generated from a large log file, perform 
some transformations on it, and write the results out such that I can 
perform a Hive query either from Hive (via Hue) or Shark.  I'm having 
troubles with the last step.  I am able to write my data out to HDFS and 
then execute a Hive create table statement followed by a load data 
statement as a separate step.  I really dislike this separate manual 
step and would like to be able to have it all accomplished in my Spark 
application.  To this end, I have investigated two possible approaches 
as detailed below - it's probably too much information so I'll ask my 
more basic question first:


Does anyone have a basic recipe/approach for loading data in an RDD to a 
Hive table from a Spark application?


1) Load it into HBase via PairRDDFunctions.saveAsHadoopDataset. There is 
a nice detailed email on how to do this here 
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201311.mbox/%3ccacyzca3askwd-tujhqi1805bn7sctguaoruhd5xtxcsul1a...@mail.gmail.com%3E. 
I didn't get very far thought because as soon as I added an hbase 
dependency (corresponding to the version of hbase we are running) to my 
pom.xml file, I had an slf4j dependency conflict that caused my current 
application to explode.  I tried the latest released version and the 
slf4j dependency problem went away but then the deprecated class 
TableOutputFormat no longer exists.  Even if loading the data into hbase 
were trivially easy (and the detailed email suggests otherwise) I would 
then need to query HBase from Hive which seems a little clunky.


2) So, I decided that Shark might be an easier option.  All the examples 
provided in their documentation seem to assume that you are using Shark 
as an interactive application from a shell.  Various threads I've seen 
seem to indicate that Shark isn't really intended to be used as 
dependency in your Spark code (see this 
https://groups.google.com/forum/#%21topic/shark-users/DHhslaOGPLg/discussion 
and that 
https://groups.google.com/forum/#%21topic/shark-users/2_Ww1xlIgvo/discussion.) 
It follows then that one can't add a Shark dependency to a pom.xml file 
because Shark isn't released via Maven Central (that I can tell 
perhaps it's in some other repo?)  Of course, there are ways of creating 
a local dependency in maven but it starts to feel very hacky.


I realize that I've given sufficient detail to expose my ignorance in a 
myriad of ways.  Please feel free to shine light on any of my 
misconceptions!


Thanks,
Philip



Re: write data into HBase via spark

2013-12-06 Thread Philip Ogren

Hao,

Thank you for the detailed response!  (even if delayed!)

I'm curious to know what version of hbase you added to your pom file.

Thanks,
Philip

On 11/14/2013 10:38 AM, Hao REN wrote:

Hi, Philip.

Basically, we need* PairRDDFunctions.saveAsHadoopDataset* to do the 
job, as HBase is not a fs, saveAsHadoopFile doesn't work.


*def saveAsHadoopDataset(conf: JobConf): Unit*

this function takes a JobConf parameter which should be configured. 
Essentially, you need to set output format and the name of the output 
table.


*// step 1: JobConf setup:*

// Note: mapred package is used, instead of the mapreduce package 
which contains new hadoop APIs.

*import org.apache.hadoop.hbase.mapred.TableOutputFormat
*
*import org.apache.hadoop.hbase.client._*
// ... some other settings

*val conf = HBaseConfiguration.create()*

// general hbase setting
*conf.set(hbase.rootdir, hdfs:// + nameNodeURL + : + hdfsPort + 
/hbase)*

*conf.setBoolean(hbase.cluster.distributed, true)*
*conf.set(hbase.zookeeper.quorum, hostname)*
*conf.setInt(hbase.client.scanner.caching, 1)*
// ... some other settings

*val jobConfig: JobConf = new JobConf(conf, this.getClass)*

// Note:  TableOutputFormat is used as deprecated code, because 
JobConf is an old hadoop API

*jobConfig.setOutputFormat(classOf[TableOutputFormat])*
*jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)*


*// step 2: give your mapping:*
*
*
// the last thing todo is mapping your local data schema to the hbase one
// Say, our hbase schema is as below:
// *rowcf:col_1cf:col_2*

// And in spark, you have a RDD of triple, like (1, 2, 3), (4, 5, 6), ...
// So you should map *RDD[(int, int, int)]* to 
*RDD[(ImmutableBytesWritable, Put)]*, where Put carries the mapping.


// You can define a function used by RDD.map, for example:

*def convert(triple: (Int, Int, Int)) = {*
*  val p = new Put(Bytes.toBytes(triple._1))*
*  p.add(Bytes.toBytes(cf), Bytes.toBytes(col_1), 
Bytes.toBytes(triple._2))*
*  p.add(Bytes.toBytes(cf), Bytes.toBytes(col_2), 
Bytes.toBytes(triple._3))*

*  (new ImmutableBytesWritable, p)*
*}*

// Suppose you have a *RDD[(Int, Int, Int)]* called *localData*, then 
writing data to hbase can be done by :


*new 
PairRDDFunctions(localData.map(convert)).saveAsHadoopDataset(jobConfig)*


Voilà. That's all you need. Hopefully, this simple example could help.

Hao.





2013/11/13 Philip Ogren philip.og...@oracle.com 
mailto:philip.og...@oracle.com


Hao,

If you have worked out the code and turn it into an example that
you can share, then please do!  This task is in my queue of things
to do so any helpful details that you uncovered would be most
appreciated.

Thanks,
Philip



On 11/13/2013 5:30 AM, Hao REN wrote:

Ok, I worked it out.

The following thread helps a lot.


http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201310.mbox/%3C7B4868A9-B83E-4507-BB2A-2721FCE8E738%40gmail.com%3E

Hao


2013/11/12 Hao REN julien19890...@gmail.com
mailto:julien19890...@gmail.com

Could someone show me a simple example about how to write
data into HBase via spark ?

I have checked HbaseTest example, it's only for reading from
HBase.

Thank you.

-- 
REN Hao


Data Engineer @ ClaraVista

Paris, France

Tel: +33 06 14 54 57 24 tel:%2B33%2006%2014%2054%2057%2024




-- 
REN Hao


Data Engineer @ ClaraVista

Paris, France

Tel: +33 06 14 54 57 24 tel:%2B33%2006%2014%2054%2057%2024





--
REN Hao

Data Engineer @ ClaraVista

Paris, France

Tel:  +33 06 14 54 57 24




Re: Spark Error Log

2013-12-06 Thread Wenlei Xie
Hi Prashant,

Thank you! The reason I would like to do it is that currently my program's
output is set to stdout, and it would be mixed with Spark's log.
That's not a big issue anyway, since I can either disable log or put some
prefix before my info :)

Best,
Wenlei


On Sun, Dec 1, 2013 at 2:49 AM, Prashant Sharma scrapco...@gmail.comwrote:

 Hi,

 I am not sure I know how to. Above should have worked. Apart from the
 trick every one knows that you can redirect stdout to stderr, knowing why
 do you need it would be great !


 On Sat, Nov 30, 2013 at 2:53 PM, Wenlei Xie wenlei@gmail.com wrote:

 Hi Prashant,

 I copied the log4j.properites.template to be log4j.preperties, but now
 all the information is in stdout rather than stderr.

 How should I make it to output to the stderr? I have tried to change
 log4j.properties to be

 log4j.rootCategory=INFO, stderr
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
 %c{1}: %m%n

 But it still prints to the stdout...


 On Thu, Nov 28, 2013 at 10:39 AM, Prashant Sharma 
 scrapco...@gmail.comwrote:

 I think all that is needed is an log4j.properties on the classpath
 http://logging.apache.org/log4j/1.2/faq.html#noconfig


 On Thu, Nov 28, 2013 at 11:52 PM, Patrick Wendell pwend...@gmail.comwrote:

 Hey Wenlei,

 There is some issue in master that is repressing the log output - I'm
 trying to debug it before we release 0.8.1. Can you explain exactly
 how you are running Spark? Are you running the shell or are you
 running a standalone application?

 - Patrick

 On Thu, Nov 28, 2013 at 12:54 AM, Wenlei Xie wenlei@gmail.com
 wrote:
  Hi,
 
  I remember Spark used to print detailed error log into the stderr
 (e.g.
  constructing RDD, evaluate it, how much memory each partition
 consumes). But
  I cannot find it anymore but only with the following information:
 
  SLF4J: Class path contains multiple SLF4J bindings.
  SLF4J: Found binding in
 
 [jar:file:/u/ytian/wenlei/dynamicGraph/graphx/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: Found binding in
 
 [jar:file:/u/ytian/wenlei/dynamicGraph/graphx/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
  explanation.
  SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  log4j:WARN No appenders could be found for logger
  (akka.event.slf4j.Slf4jEventHandler).
  log4j:WARN Please initialize the log4j system properly.
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfigfor
  more info.
 
 
  What should I do for it?
 
  Thanks,
 
  Wenlei




 --
 s




 --
 Wenlei Xie (谢文磊)

 Department of Computer Science
 5132 Upson Hall, Cornell University
 Ithaca, NY 14853, USA
 Phone: (607) 255-5577
 Email: wenlei@gmail.com




 --
 s




-- 
Wenlei Xie (谢文磊)

Department of Computer Science
5132 Upson Hall, Cornell University
Ithaca, NY 14853, USA
Phone: (607) 255-5577
Email: wenlei@gmail.com


RE: write data into HBase via spark

2013-12-06 Thread Benjamin Kim
Hi Phillip/Hao,
I was wondering if there is a simple working example out there that I can just 
run and see it work. Then, I can customize it for our needs. Unfortunately, 
this explanation still confuses me a little.
Here is a little about the environment we are working with. We have Cloudera's 
CDH 4.4.0 installed, and it comes with HBase 0.94.6. We get data streamed in 
using Flume-NG 1.4.0. All of this is managed using Cloudera Manager 4.7.2 to 
setup and configure these services.
If you need any more information or are able to help, I would be glad to 
accommodate.
Thanks,Ben 

Date: Fri, 6 Dec 2013 18:07:08 -0700
From: philip.og...@oracle.com
To: user@spark.incubator.apache.org
Subject: Re: write data into HBase via spark


  

  
  
Hao,



Thank you for the detailed response!  (even if delayed!)  



I'm curious to know what version of hbase you added to your pom
file.



Thanks,

Philip



On 11/14/2013 10:38 AM, Hao REN wrote:



  Hi, Philip.



Basically, we need PairRDDFunctions.saveAsHadoopDataset
  to do the job, as HBase is not a fs, saveAsHadoopFile doesn't
  work.



def saveAsHadoopDataset(conf: JobConf): Unit



this function takes a JobConf parameter which should be
  configured. Essentially, you need to set output format and the
  name of the output table.



// step 1: JobConf setup:



// Note: mapred package is used, instead of the mapreduce
  package which contains new hadoop APIs.
import org.apache.hadoop.hbase.mapred.TableOutputFormat

  
import org.apache.hadoop.hbase.client._


// ... some other settings




  val conf = HBaseConfiguration.create()
  

  
  // general hbase setting

  
  conf.set(hbase.rootdir, hdfs:// + nameNodeURL +
  : + hdfsPort + /hbase)
  conf.setBoolean(hbase.cluster.distributed, true)
  conf.set(hbase.zookeeper.quorum, hostname)
  conf.setInt(hbase.client.scanner.caching, 1)

// ... some other settings




  val jobConfig: JobConf = new JobConf(conf,
  this.getClass)
  

  
  // Note:  TableOutputFormat is used as deprecated code,
because JobConf is an old hadoop API
  jobConfig.setOutputFormat(classOf[TableOutputFormat])
  jobConfig.set(TableOutputFormat.OUTPUT_TABLE,
  outputTable)







// step 2: give your mapping:




  
// the last thing todo is mapping your local data schema to
  the hbase one
// Say, our hbase schema is as below:
// rowcf:col_1cf:col_2





// And in spark, you have a RDD of triple, like (1, 2, 3),
  (4, 5, 6), ...


// So you should map RDD[(int, int, int)] to 
RDD[(ImmutableBytesWritable,
Put)], where Put carries the mapping.



// You can define a function used by RDD.map, for example:




  def convert(triple: (Int, Int, Int)) = {
val p = new Put(Bytes.toBytes(triple._1))
p.add(Bytes.toBytes(cf),
  Bytes.toBytes(col_1), Bytes.toBytes(triple._2))
p.add(Bytes.toBytes(cf),
  Bytes.toBytes(col_2), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, p)
  }




// Suppose you have a RDD[(Int, Int, Int)] called localData,
  then writing data to hbase can be done by :



new

PairRDDFunctions(localData.map(convert)).saveAsHadoopDataset(jobConfig)



Voilà. That's all you need. Hopefully, this simple example
  could help.



Hao.
  

  







  

  
  



2013/11/13 Philip Ogren philip.og...@oracle.com

  
 Hao,

  

  If you have worked out the code and turn it into an
  example that you can share, then please do!  This task is
  in my queue of things to do so any helpful details that
  you uncovered would be most appreciated.

  

  Thanks,

  Philip
  


  

  

  On 11/13/2013 5:30 AM, Hao REN wrote:

  
  
Ok, I worked it out.
  

  
  The following thread helps a lot.

Re: Incremental Updates to an RDD

2013-12-06 Thread Christopher Nguyen
Kyle, the fundamental contract of a Spark RDD is that it is immutable. This
follows the paradigm where data is (functionally) transformed into other
data, rather than mutated. This allows these systems to make certain
assumptions and guarantees that otherwise they wouldn't be able to.

Now we've been able to get mutative behavior with RDDs---for fun,
almost---but that's implementation dependent and may break at any time.

It turns out this behavior is quite appropriate for the analytic stack,
where you typically apply the same transform/operator to all data. You're
finding that transactional systems are the exact opposite, where you
typically apply a different operation to individual pieces of the data.
Incidentally this is also the dichotomy between column- and row-based
storage being optimal for each respective pattern.

Spark is intended for the analytic stack. To use Spark as the persistence
layer of a transaction system is going to be very awkward. I know there are
some vendors who position their in-memory databases as good for both OLTP
and OLAP use cases, but when you talk to them in depth they will readily
admit that it's really optimal for one and not the other.

If you want to make a project out of making a special Spark RDD that
supports this behavior, it might be interesting. But there will be no
simple shortcuts to get there from here.

--
Christopher T. Nguyen
Co-founder  CEO, Adatao http://adatao.com
linkedin.com/in/ctnguyen



On Fri, Dec 6, 2013 at 10:56 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:

 I'm trying to figure out if I can use an RDD to backend an interactive
 server. One of the requirements would be to have incremental updates to
 elements in the RDD, ie transforms that change/add/delete a single element
 in the RDD.
 It seems pretty drastic to do a full RDD filter to remove a single
 element, or do the union of the RDD with another one of size 1 to add an
 element. (Or is it?) Is there an efficient way to do this in Spark? Are
 there any example of this kind of usage?

 Thank you,
 Kyle



Build Spark with maven

2013-12-06 Thread Azuryy Yu
Hey dears,

Can you give me a maven repo, so I can compile Spark with Maven.

I'm using http://repo1.maven.org/maven2/ currently

but It complains cannot find akka-actor-2.0.1,  I searched on the
repo1.maven, and I am also cannot find akka-actor-2.0.1, which is too old.

another strange output I can see:
scala.version2.9.3/scala.version  in the pom,
but Maven download scala-2.9.2 during compile, why is that?


Thanks.