Re: Spark heap issues
Still see a whole lot of following erros java.lang.OutOfMemoryError: Java heap space 13/12/05 16:04:13 INFO executor.StandaloneExecutorBackend: Got assigned task 553 13/12/05 16:04:13 INFO executor.Executor: Running task ID 553 Issue seems to be that the process hangs as we are probably performing full GC cycles... 1536.617: [Full GC 1536.617: [CMS: 707839K-707839K(707840K), 5.0507000 secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)], 5.0507940 secs] [Times: user=4.94 sys=0.00, real=5.05 secs] 1541.669: [Full GC 1541.669: [CMS: 707840K-707839K(707840K), 4.5483600 secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)], 4.5484390 secs] [Times: user=4.47 sys=0.00, real=4.55 secs] 1546.218: [Full GC 1546.218: [CMS: 707839K-707839K(707840K), 4.5937460 secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)], 4.5938460 secs] [Times: user=4.59 sys=0.00, real=4.60 secs] 1550.812: [Full GC 1550.812: [CMS: 707839K-707839K(707840K), 5.3572370 secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)], 5.3573840 secs] [Times: user=5.26 sys=0.01, real=5.35 secs] 1556.171: [Full GC 1556.171: [CMS: 707840K-694574K(707840K), 4.1462520 secs] 1014528K-860511K(1014528K), [CMS Perm : 31955K-31955K(53572K)], 4.1463350 secs] [Times: user=4.13 sys=0.00, real=4.15 secs] 1560.329: [GC [1 CMS-initial-mark: 694574K(707840K)] 874378K(1014528K), 0.4269160 secs] [Times: user=0.41 sys=0.00, real=0.43 secs] I tried the following parameters and they do not seem to help System.setProperty(spark.serializer, org.apache.spark.serializer.KryoSerializer) System.setProperty(spark.akka.timeout, 30) //in seconds System.setProperty(spark.executor.memory,15g) System.setProperty(spark.akka.frameSize, 2000) //in MB System.setProperty(spark.akka.threads,8) Thanks On Thu, Dec 5, 2013 at 11:31 PM, purav aggarwal puravaggarwal...@gmail.comwrote: Try allocating some more resources to your application. You seem to be using 512Mb for you worker node - (you can verify that from the master UI) Try putting the following settings into your code and see if it helps - System.setProperty(spark.executor.memory,15g) // Will allocate more memory System.setProperty(spark.akka.frameSize,2000) System.setProperty(spark.akka.threads,16) // Dependent upon number of cores with your worker machine On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all learner1...@gmail.comwrote: Hi, Trying to do a join operation on an RDD, my input is pipe delimited data and there are 2 files. One file is 24MB and the other file is 285MB. Setup being used is the single node (server) setup: SPARK_MEM set to 512m Master /pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080 Worker /pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:7077 App /pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4 Here is the code import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel object SimpleApp { def main (args: Array[String]) { System.setProperty(spark.local.dir,/spark-0.8.0-incubating-bin-cdh4/tmp); System.setProperty(spark.serializer, org.apache.spark.serializer.KryoSerializer) System.setProperty(spark.akka.timeout, 30) //in seconds val dataFile2 = /tmp_data/data1.txt
Spark Import Issue
Hello, I am new to the spark system, and I am trying to write a simple program to get myself familiar with how spark works. I am currently having problem with importing the spark package. I am getting the following compiler error: package org.apache.spark.api.java does not exist. I have spark-0.8.0-incubating install. I ran the commands: sbt/sbt compile, sbt/sbt assembly, and sbt/sbt publish-local without any errors. My sql.java file is located in the spark-0.8.0-incubating root directory. I tried to compile the code using “javac sql.java” and “javac -cp assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating*.jar sql.java”. Here is the code for sql.java: package shark; import java.io.Serializable; import java.util.List; import java.io.*; import org.apache.spark.api.java.*; //Issue is here public class sql implements Serializable { public static void main( String[] args) { System.out.println(Hello World”); } } What do I need to do in order for java to import the spark code properly? Any advice would be greatly appreciated. Thank you, Garrett Hamers
Cluster not accepting jobs
Hi, all. I'm trying to connect to a remote cluster from my machine, using spark 0.7.3. In conf/spark-env.sh, I've set MASTER, SCALA_HOME, SPARK_MASTER_IP, and SPARK_MASTER_PORT. When I try to run a job, it starts, but never gets anywhere, and I keep getting the following error message: 13/12/06 13:37:20 WARN cluster.ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered I look at the cluster UI in a browser, and it says it has 8 workers registered, all alive. What does this error mean? I assume I'm missing something in the setup - does anyone know what? Thanks in advance, -Nathan Kronenfeld
Re: Cluster not accepting jobs
Never mind, I figured it out - apparently it was different DNS resolutions locally and within the cluster; when I use the IP address instead of the machine name in MASTER, it all seems to work. On Fri, Dec 6, 2013 at 1:38 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Hi, all. I'm trying to connect to a remote cluster from my machine, using spark 0.7.3. In conf/spark-env.sh, I've set MASTER, SCALA_HOME, SPARK_MASTER_IP, and SPARK_MASTER_PORT. When I try to run a job, it starts, but never gets anywhere, and I keep getting the following error message: 13/12/06 13:37:20 WARN cluster.ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered I look at the cluster UI in a browser, and it says it has 8 workers registered, all alive. What does this error mean? I assume I'm missing something in the setup - does anyone know what? Thanks in advance, -Nathan Kronenfeld -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Cluster not accepting jobs
Yeah, in general, make sure you use exactly the same “cluster URL” string shown on the master’s web UI. There’s currently a limitation in Akka where different ways of specifying the hostname won’t work. Matei On Dec 6, 2013, at 10:54 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Never mind, I figured it out - apparently it was different DNS resolutions locally and within the cluster; when I use the IP address instead of the machine name in MASTER, it all seems to work. On Fri, Dec 6, 2013 at 1:38 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Hi, all. I'm trying to connect to a remote cluster from my machine, using spark 0.7.3. In conf/spark-env.sh, I've set MASTER, SCALA_HOME, SPARK_MASTER_IP, and SPARK_MASTER_PORT. When I try to run a job, it starts, but never gets anywhere, and I keep getting the following error message: 13/12/06 13:37:20 WARN cluster.ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered I look at the cluster UI in a browser, and it says it has 8 workers registered, all alive. What does this error mean? I assume I'm missing something in the setup - does anyone know what? Thanks in advance, -Nathan Kronenfeld -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Spark 0.8.0 Compiling issues
Hi there: I've trying to compile using sbt/sbt assembly and mvn clean package (with memory adjustments as suggested here http://spark.incubator.apache.org/docs/latest/building-with-maven.html). Unfortunately, compiling fails for both of them with the following error (here is with Maven but with SBT the error happens at the same class): [INFO] Using incremental compilation [INFO] 'compiler-interface' not yet compiled for Scala 2.9.3. Compiling... [INFO] Compilation completed in 17.554 s [INFO] Compiling 258 Scala sources and 16 Java sources to /home/gustavo/tools/spark-0.8.0-incubating/core/target/scala-2.9.3/classes... [WARNING] /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:129: method cleanupJob in class OutputCommitter is deprecated: see corresponding Javadoc for more information. [WARNING] getOutputCommitter().cleanupJob(getJobContext()) [WARNING] ^ [WARNING] /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala:592: method cleanupJob in class OutputCommitter is deprecated: see corresponding Javadoc for more information. [WARNING] jobCommitter.cleanupJob(jobTaskContext) [WARNING] ^ [ERROR] File name too long [WARNING] two warnings found [ERROR] one error found Is 0.8.0 ready for production? is 0.7.0 more stable? I'm running on java 6. Cheers Gustavo
Re: Spark 0.8.0 Compiling issues
Yeah, unfortunately the reason it pops up more in 0.8.0 is because our package names got longer! But if you just do the build in /tmp it will work. On Dec 6, 2013, at 11:35 AM, Josh Rosen rosenvi...@gmail.com wrote: This isn't a Spark 0.8.0-specific problem. I googled for sbt error filen ame too long and found a couple of links that suggest that this error may crop up for Linux users with encrypted filesystems or home directories: http://stackoverflow.com/questions/8404815/how-do-i-build-a-project-that-uses-sbt-as-its-build-system https://github.com/sbt/sbt-assembly/issues/69 Try one of the workarounds from those links, such as storing the build target directory on an unencrypted volume. On Fri, Dec 6, 2013 at 11:25 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Hi there: I've trying to compile using sbt/sbt assembly and mvn clean package (with memory adjustments as suggested here http://spark.incubator.apache.org/docs/latest/building-with-maven.html). Unfortunately, compiling fails for both of them with the following error (here is with Maven but with SBT the error happens at the same class): [INFO] Using incremental compilation [INFO] 'compiler-interface' not yet compiled for Scala 2.9.3. Compiling... [INFO] Compilation completed in 17.554 s [INFO] Compiling 258 Scala sources and 16 Java sources to /home/gustavo/tools/spark-0.8.0-incubating/core/target/scala-2.9.3/classes... [WARNING] /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:129: method cleanupJob in class OutputCommitter is deprecated: see corresponding Javadoc for more information. [WARNING] getOutputCommitter().cleanupJob(getJobContext()) [WARNING] ^ [WARNING] /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala:592: method cleanupJob in class OutputCommitter is deprecated: see corresponding Javadoc for more information. [WARNING] jobCommitter.cleanupJob(jobTaskContext) [WARNING] ^ [ERROR] File name too long [WARNING] two warnings found [ERROR] one error found Is 0.8.0 ready for production? is 0.7.0 more stable? I'm running on java 6. Cheers Gustavo
Re: Pre-build Spark for Windows 8.1
Hey Andrew, unfortunately I don’t know how easy this is. Maybe future versions of Akka have it. We can certainly ask them to do it in general but I imagine there are some use cases where they wanted this behavior. Matei On Dec 5, 2013, at 2:49 PM, Andrew Ash and...@andrewash.com wrote: Speaking of akka and host sensitivity... How much have you hacked on akka to get it to support all of: myhost.mydomain.int, myhost, and 10.1.1.1? It's kind of a pain to get the Spark URL to exactly match. I'm wondering if there are usability gains that could be made here or if we're pretty stuck. On Thu, Dec 5, 2013 at 2:43 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi, When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the URL (uppercase instead of lowercase). Unfortunately Akka is very specific about seeing hostnames written in the same way on each node, or else it thinks the message is for another machine! Matei On Dec 5, 2013, at 8:27 AM, Adrian Bonar adrian.bo...@microsoft.com wrote: The master starts up now as expected but the workers are unable to connect to the master. It looks like the master is refusing the connection messages but I’m not sure why. The first two error lines below are from trying to connect a worker from a separate machine and the last two error lines are from trying to connect a worker on the same machine as the master. I verified that the workers do not show up in the master’s web ui. MASTER: D:\sparkspark-class org.apache.spark.deploy.master.Master 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:08:34 INFO master.Master: Starting Spark master at spark://ADRIBONA-DEV-1:7077 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/master/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/applications/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:08:34 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8088 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at http://ADRIBONA-DEV-1:8088 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 WORKER: D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker spark://adribona-dev-1:7077 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/log,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/logPage,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started
Re: Spark 0.8.0 Compiling issues
Thanks! I will do that. Cheers Gustavo On Fri, Dec 6, 2013 at 5:53 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Yeah, unfortunately the reason it pops up more in 0.8.0 is because our package names got longer! But if you just do the build in /tmp it will work. On Dec 6, 2013, at 11:35 AM, Josh Rosen rosenvi...@gmail.com wrote: This isn't a Spark 0.8.0-specific problem. I googled for sbt error filen ame too long and found a couple of links that suggest that this error may crop up for Linux users with encrypted filesystems or home directories: http://stackoverflow.com/questions/8404815/how-do-i-build-a-project-that-uses-sbt-as-its-build-system https://github.com/sbt/sbt-assembly/issues/69 Try one of the workarounds from those links, such as storing the build target directory on an unencrypted volume. On Fri, Dec 6, 2013 at 11:25 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Hi there: I've trying to compile using sbt/sbt assembly and mvn clean package (with memory adjustments as suggested here http://spark.incubator.apache.org/docs/latest/building-with-maven.html). Unfortunately, compiling fails for both of them with the following error (here is with Maven but with SBT the error happens at the same class): [INFO] Using incremental compilation [INFO] 'compiler-interface' not yet compiled for Scala 2.9.3. Compiling... [INFO] Compilation completed in 17.554 s [INFO] Compiling 258 Scala sources and 16 Java sources to /home/gustavo/tools/spark-0.8.0-incubating/core/target/scala-2.9.3/classes... [WARNING] /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:129: method cleanupJob in class OutputCommitter is deprecated: see corresponding Javadoc for more information. [WARNING] getOutputCommitter().cleanupJob(getJobContext()) [WARNING] ^ [WARNING] /home/gustavo/tools/spark-0.8.0-incubating/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala:592: method cleanupJob in class OutputCommitter is deprecated: see corresponding Javadoc for more information. [WARNING] jobCommitter.cleanupJob(jobTaskContext) [WARNING] ^ [ERROR] File name too long [WARNING] two warnings found [ERROR] one error found Is 0.8.0 ready for production? is 0.7.0 more stable? I'm running on java 6. Cheers Gustavo
Writing an RDD to Hive
I have a simple scenario that I'm struggling to implement. I would like to take a fairly simple RDD generated from a large log file, perform some transformations on it, and write the results out such that I can perform a Hive query either from Hive (via Hue) or Shark. I'm having troubles with the last step. I am able to write my data out to HDFS and then execute a Hive create table statement followed by a load data statement as a separate step. I really dislike this separate manual step and would like to be able to have it all accomplished in my Spark application. To this end, I have investigated two possible approaches as detailed below - it's probably too much information so I'll ask my more basic question first: Does anyone have a basic recipe/approach for loading data in an RDD to a Hive table from a Spark application? 1) Load it into HBase via PairRDDFunctions.saveAsHadoopDataset. There is a nice detailed email on how to do this here http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201311.mbox/%3ccacyzca3askwd-tujhqi1805bn7sctguaoruhd5xtxcsul1a...@mail.gmail.com%3E. I didn't get very far thought because as soon as I added an hbase dependency (corresponding to the version of hbase we are running) to my pom.xml file, I had an slf4j dependency conflict that caused my current application to explode. I tried the latest released version and the slf4j dependency problem went away but then the deprecated class TableOutputFormat no longer exists. Even if loading the data into hbase were trivially easy (and the detailed email suggests otherwise) I would then need to query HBase from Hive which seems a little clunky. 2) So, I decided that Shark might be an easier option. All the examples provided in their documentation seem to assume that you are using Shark as an interactive application from a shell. Various threads I've seen seem to indicate that Shark isn't really intended to be used as dependency in your Spark code (see this https://groups.google.com/forum/#%21topic/shark-users/DHhslaOGPLg/discussion and that https://groups.google.com/forum/#%21topic/shark-users/2_Ww1xlIgvo/discussion.) It follows then that one can't add a Shark dependency to a pom.xml file because Shark isn't released via Maven Central (that I can tell perhaps it's in some other repo?) Of course, there are ways of creating a local dependency in maven but it starts to feel very hacky. I realize that I've given sufficient detail to expose my ignorance in a myriad of ways. Please feel free to shine light on any of my misconceptions! Thanks, Philip
Re: write data into HBase via spark
Hao, Thank you for the detailed response! (even if delayed!) I'm curious to know what version of hbase you added to your pom file. Thanks, Philip On 11/14/2013 10:38 AM, Hao REN wrote: Hi, Philip. Basically, we need* PairRDDFunctions.saveAsHadoopDataset* to do the job, as HBase is not a fs, saveAsHadoopFile doesn't work. *def saveAsHadoopDataset(conf: JobConf): Unit* this function takes a JobConf parameter which should be configured. Essentially, you need to set output format and the name of the output table. *// step 1: JobConf setup:* // Note: mapred package is used, instead of the mapreduce package which contains new hadoop APIs. *import org.apache.hadoop.hbase.mapred.TableOutputFormat * *import org.apache.hadoop.hbase.client._* // ... some other settings *val conf = HBaseConfiguration.create()* // general hbase setting *conf.set(hbase.rootdir, hdfs:// + nameNodeURL + : + hdfsPort + /hbase)* *conf.setBoolean(hbase.cluster.distributed, true)* *conf.set(hbase.zookeeper.quorum, hostname)* *conf.setInt(hbase.client.scanner.caching, 1)* // ... some other settings *val jobConfig: JobConf = new JobConf(conf, this.getClass)* // Note: TableOutputFormat is used as deprecated code, because JobConf is an old hadoop API *jobConfig.setOutputFormat(classOf[TableOutputFormat])* *jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)* *// step 2: give your mapping:* * * // the last thing todo is mapping your local data schema to the hbase one // Say, our hbase schema is as below: // *rowcf:col_1cf:col_2* // And in spark, you have a RDD of triple, like (1, 2, 3), (4, 5, 6), ... // So you should map *RDD[(int, int, int)]* to *RDD[(ImmutableBytesWritable, Put)]*, where Put carries the mapping. // You can define a function used by RDD.map, for example: *def convert(triple: (Int, Int, Int)) = {* * val p = new Put(Bytes.toBytes(triple._1))* * p.add(Bytes.toBytes(cf), Bytes.toBytes(col_1), Bytes.toBytes(triple._2))* * p.add(Bytes.toBytes(cf), Bytes.toBytes(col_2), Bytes.toBytes(triple._3))* * (new ImmutableBytesWritable, p)* *}* // Suppose you have a *RDD[(Int, Int, Int)]* called *localData*, then writing data to hbase can be done by : *new PairRDDFunctions(localData.map(convert)).saveAsHadoopDataset(jobConfig)* Voilà. That's all you need. Hopefully, this simple example could help. Hao. 2013/11/13 Philip Ogren philip.og...@oracle.com mailto:philip.og...@oracle.com Hao, If you have worked out the code and turn it into an example that you can share, then please do! This task is in my queue of things to do so any helpful details that you uncovered would be most appreciated. Thanks, Philip On 11/13/2013 5:30 AM, Hao REN wrote: Ok, I worked it out. The following thread helps a lot. http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201310.mbox/%3C7B4868A9-B83E-4507-BB2A-2721FCE8E738%40gmail.com%3E Hao 2013/11/12 Hao REN julien19890...@gmail.com mailto:julien19890...@gmail.com Could someone show me a simple example about how to write data into HBase via spark ? I have checked HbaseTest example, it's only for reading from HBase. Thank you. -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24 tel:%2B33%2006%2014%2054%2057%2024 -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24 tel:%2B33%2006%2014%2054%2057%2024 -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24
Re: Spark Error Log
Hi Prashant, Thank you! The reason I would like to do it is that currently my program's output is set to stdout, and it would be mixed with Spark's log. That's not a big issue anyway, since I can either disable log or put some prefix before my info :) Best, Wenlei On Sun, Dec 1, 2013 at 2:49 AM, Prashant Sharma scrapco...@gmail.comwrote: Hi, I am not sure I know how to. Above should have worked. Apart from the trick every one knows that you can redirect stdout to stderr, knowing why do you need it would be great ! On Sat, Nov 30, 2013 at 2:53 PM, Wenlei Xie wenlei@gmail.com wrote: Hi Prashant, I copied the log4j.properites.template to be log4j.preperties, but now all the information is in stdout rather than stderr. How should I make it to output to the stderr? I have tried to change log4j.properties to be log4j.rootCategory=INFO, stderr log4j.appender.stderr=org.apache.log4j.ConsoleAppender log4j.appender.stderr.layout=org.apache.log4j.PatternLayout log4j.appender.stderr.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n But it still prints to the stdout... On Thu, Nov 28, 2013 at 10:39 AM, Prashant Sharma scrapco...@gmail.comwrote: I think all that is needed is an log4j.properties on the classpath http://logging.apache.org/log4j/1.2/faq.html#noconfig On Thu, Nov 28, 2013 at 11:52 PM, Patrick Wendell pwend...@gmail.comwrote: Hey Wenlei, There is some issue in master that is repressing the log output - I'm trying to debug it before we release 0.8.1. Can you explain exactly how you are running Spark? Are you running the shell or are you running a standalone application? - Patrick On Thu, Nov 28, 2013 at 12:54 AM, Wenlei Xie wenlei@gmail.com wrote: Hi, I remember Spark used to print detailed error log into the stderr (e.g. constructing RDD, evaluate it, how much memory each partition consumes). But I cannot find it anymore but only with the following information: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/u/ytian/wenlei/dynamicGraph/graphx/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u/ytian/wenlei/dynamicGraph/graphx/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jEventHandler). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfigfor more info. What should I do for it? Thanks, Wenlei -- s -- Wenlei Xie (谢文磊) Department of Computer Science 5132 Upson Hall, Cornell University Ithaca, NY 14853, USA Phone: (607) 255-5577 Email: wenlei@gmail.com -- s -- Wenlei Xie (谢文磊) Department of Computer Science 5132 Upson Hall, Cornell University Ithaca, NY 14853, USA Phone: (607) 255-5577 Email: wenlei@gmail.com
RE: write data into HBase via spark
Hi Phillip/Hao, I was wondering if there is a simple working example out there that I can just run and see it work. Then, I can customize it for our needs. Unfortunately, this explanation still confuses me a little. Here is a little about the environment we are working with. We have Cloudera's CDH 4.4.0 installed, and it comes with HBase 0.94.6. We get data streamed in using Flume-NG 1.4.0. All of this is managed using Cloudera Manager 4.7.2 to setup and configure these services. If you need any more information or are able to help, I would be glad to accommodate. Thanks,Ben Date: Fri, 6 Dec 2013 18:07:08 -0700 From: philip.og...@oracle.com To: user@spark.incubator.apache.org Subject: Re: write data into HBase via spark Hao, Thank you for the detailed response! (even if delayed!) I'm curious to know what version of hbase you added to your pom file. Thanks, Philip On 11/14/2013 10:38 AM, Hao REN wrote: Hi, Philip. Basically, we need PairRDDFunctions.saveAsHadoopDataset to do the job, as HBase is not a fs, saveAsHadoopFile doesn't work. def saveAsHadoopDataset(conf: JobConf): Unit this function takes a JobConf parameter which should be configured. Essentially, you need to set output format and the name of the output table. // step 1: JobConf setup: // Note: mapred package is used, instead of the mapreduce package which contains new hadoop APIs. import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.client._ // ... some other settings val conf = HBaseConfiguration.create() // general hbase setting conf.set(hbase.rootdir, hdfs:// + nameNodeURL + : + hdfsPort + /hbase) conf.setBoolean(hbase.cluster.distributed, true) conf.set(hbase.zookeeper.quorum, hostname) conf.setInt(hbase.client.scanner.caching, 1) // ... some other settings val jobConfig: JobConf = new JobConf(conf, this.getClass) // Note: TableOutputFormat is used as deprecated code, because JobConf is an old hadoop API jobConfig.setOutputFormat(classOf[TableOutputFormat]) jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable) // step 2: give your mapping: // the last thing todo is mapping your local data schema to the hbase one // Say, our hbase schema is as below: // rowcf:col_1cf:col_2 // And in spark, you have a RDD of triple, like (1, 2, 3), (4, 5, 6), ... // So you should map RDD[(int, int, int)] to RDD[(ImmutableBytesWritable, Put)], where Put carries the mapping. // You can define a function used by RDD.map, for example: def convert(triple: (Int, Int, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.add(Bytes.toBytes(cf), Bytes.toBytes(col_1), Bytes.toBytes(triple._2)) p.add(Bytes.toBytes(cf), Bytes.toBytes(col_2), Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) } // Suppose you have a RDD[(Int, Int, Int)] called localData, then writing data to hbase can be done by : new PairRDDFunctions(localData.map(convert)).saveAsHadoopDataset(jobConfig) Voilà. That's all you need. Hopefully, this simple example could help. Hao. 2013/11/13 Philip Ogren philip.og...@oracle.com Hao, If you have worked out the code and turn it into an example that you can share, then please do! This task is in my queue of things to do so any helpful details that you uncovered would be most appreciated. Thanks, Philip On 11/13/2013 5:30 AM, Hao REN wrote: Ok, I worked it out. The following thread helps a lot.
Re: Incremental Updates to an RDD
Kyle, the fundamental contract of a Spark RDD is that it is immutable. This follows the paradigm where data is (functionally) transformed into other data, rather than mutated. This allows these systems to make certain assumptions and guarantees that otherwise they wouldn't be able to. Now we've been able to get mutative behavior with RDDs---for fun, almost---but that's implementation dependent and may break at any time. It turns out this behavior is quite appropriate for the analytic stack, where you typically apply the same transform/operator to all data. You're finding that transactional systems are the exact opposite, where you typically apply a different operation to individual pieces of the data. Incidentally this is also the dichotomy between column- and row-based storage being optimal for each respective pattern. Spark is intended for the analytic stack. To use Spark as the persistence layer of a transaction system is going to be very awkward. I know there are some vendors who position their in-memory databases as good for both OLTP and OLAP use cases, but when you talk to them in depth they will readily admit that it's really optimal for one and not the other. If you want to make a project out of making a special Spark RDD that supports this behavior, it might be interesting. But there will be no simple shortcuts to get there from here. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Fri, Dec 6, 2013 at 10:56 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm trying to figure out if I can use an RDD to backend an interactive server. One of the requirements would be to have incremental updates to elements in the RDD, ie transforms that change/add/delete a single element in the RDD. It seems pretty drastic to do a full RDD filter to remove a single element, or do the union of the RDD with another one of size 1 to add an element. (Or is it?) Is there an efficient way to do this in Spark? Are there any example of this kind of usage? Thank you, Kyle
Build Spark with maven
Hey dears, Can you give me a maven repo, so I can compile Spark with Maven. I'm using http://repo1.maven.org/maven2/ currently but It complains cannot find akka-actor-2.0.1, I searched on the repo1.maven, and I am also cannot find akka-actor-2.0.1, which is too old. another strange output I can see: scala.version2.9.3/scala.version in the pom, but Maven download scala-2.9.2 during compile, why is that? Thanks.