Re: Why are there different parts in my CSV?
You can directly write to hbase with Spark. Here's and example for doing that https://issues.apache.org/jira/browse/SPARK-944 Thanks Best Regards On Sat, Feb 14, 2015 at 2:55 PM, Su She suhsheka...@gmail.com wrote: Hello Akhil, thank you for your continued help! 1) So, if I can write it in programitically after every batch, then technically I should be able to have just the csv files in one directory. However, can the /desired/output/file.txt be in hdfs? If it is only local, I am not sure if it will help me for my use case I describe in 2) so can i do something like this hadoop fs -getmerge /output/dir/on/hdfs desired/dir/in/hdfs ? 2) Just to make sure I am going on the right path...my end use case is to use hive or hbase to create a database off these csv files. Is there an easy way for hive to read /user/test/many sub directories/with one csv file in each into a table? Thank you! On Sat, Feb 14, 2015 at 12:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new *directory *that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Configration Problem? (need help to get Spark job executed)
Hi all, I am new to spark and seem to have hit a common newbie obstacle. I have a pretty simple setup and job but I am unable to get past this error when executing a job: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory” I have so far gained a basic understanding of worker/executor/driver memory, but have run out of ideas what to try next - maybe someone has a clue. My setup: Three node standalone cluster with C* and spark on each node and the Datastax C*/Spark connector JAR placed on each node. On the master I have the slaves configured in conf/slaves and I am using sbin/start-all.sh to start the whole cluster. On each node I have this in conf/spark-defauls.conf spark.masterspark://devpeng-db-cassandra-1:7077 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.executor.extraClassPath /opt/spark-cassandra-connector-assembly-1.2.0-alpha1.jar and this in conf/spart-env.sh SPARK_WORKER_MEMORY=6g My App looks like this object TestApp extends App { val conf = new SparkConf(true).set(spark.cassandra.connection.host, devpeng-db-cassandra-1.) val sc = new SparkContext(spark://devpeng-db-cassandra-1:7077, testApp, conf) val rdd = sc.cassandraTable(test, kv) println(“Count: “ + String.valueOf(rdd.count) ) println(rdd.first) } Any kind of idea what to check next would help me at this point, I think. Jan Log of the application start: [info] Loading project definition from /Users/jan/projects/gkh/jump/workspace/gkh-spark-example/project [info] Set current project to csconnect (in build file:/Users/jan/projects/gkh/jump/workspace/gkh-spark-example/) [info] Compiling 1 Scala source to /Users/jan/projects/gkh/jump/workspace/gkh-spark-example/target/scala-2.10/classes... [info] Running jump.TestApp Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/02/14 10:30:11 INFO SecurityManager: Changing view acls to: jan 15/02/14 10:30:11 INFO SecurityManager: Changing modify acls to: jan 15/02/14 10:30:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jan); users with modify permissions: Set(jan) 15/02/14 10:30:11 INFO Slf4jLogger: Slf4jLogger started 15/02/14 10:30:11 INFO Remoting: Starting remoting 15/02/14 10:30:12 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@xx:58197] 15/02/14 10:30:12 INFO Utils: Successfully started service 'sparkDriver' on port 58197. 15/02/14 10:30:12 INFO SparkEnv: Registering MapOutputTracker 15/02/14 10:30:12 INFO SparkEnv: Registering BlockManagerMaster 15/02/14 10:30:12 INFO DiskBlockManager: Created local directory at /var/folders/vr/w3whx92d0356g5nj1p6s59grgn/T/spark-local-20150214103012-5b53 15/02/14 10:30:12 INFO MemoryStore: MemoryStore started with capacity 530.3 MB 2015-02-14 10:30:12.304 java[24999:3b07] Unable to load realm info from SCDynamicStore 15/02/14 10:30:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/14 10:30:12 INFO HttpFileServer: HTTP File server directory is /var/folders/vr/w3whx92d0356g5nj1p6s59grgn/T/spark-48459a22-c1ff-42d5-8b8e-cc89fe84933d 15/02/14 10:30:12 INFO HttpServer: Starting HTTP Server 15/02/14 10:30:12 INFO Utils: Successfully started service 'HTTP file server' on port 58198. 15/02/14 10:30:12 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/14 10:30:12 INFO SparkUI: Started SparkUI at http://xx:4040 15/02/14 10:30:12 INFO AppClient$ClientActor: Connecting to master spark://devpeng-db-cassandra-1:7077... 15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150214103013-0001 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor added: app-20150214103013-0001/0 on worker-20150214102534-devpeng-db-cassandra-2.devpeng (devpeng-db-cassandra-2.devpeng.x:57563) with 8 cores 15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150214103013-0001/0 on hostPort devpeng-db-cassandra-2.devpeng.:57563 with 8 cores, 512.0 MB RAM 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor added: app-20150214103013-0001/1 on worker-20150214102534-devpeng-db-cassandra-3.devpeng.-38773 (devpeng-db-cassandra-3.devpeng.xx:38773) with 8 cores 15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150214103013-0001/1 on hostPort devpeng-db-cassandra-3.devpeng.xe:38773 with 8 cores, 512.0 MB RAM 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor updated: app-20150214103013-0001/0 is now LOADING 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor updated: app-20150214103013-0001/1 is now LOADING 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor updated:
Re: Why are there different parts in my CSV?
http://stackoverflow.com/questions/23527941/how-to-write-to-csv-in-spark Just read this...seems like it should be easily readable. Thanks! On Sat, Feb 14, 2015 at 1:36 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the link. Is there a reason why there is a new directory created for each batch? Is this a format that is easily readable by other applications such as hive/impala? On Sat, Feb 14, 2015 at 1:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can directly write to hbase with Spark. Here's and example for doing that https://issues.apache.org/jira/browse/SPARK-944 Thanks Best Regards On Sat, Feb 14, 2015 at 2:55 PM, Su She suhsheka...@gmail.com wrote: Hello Akhil, thank you for your continued help! 1) So, if I can write it in programitically after every batch, then technically I should be able to have just the csv files in one directory. However, can the /desired/output/file.txt be in hdfs? If it is only local, I am not sure if it will help me for my use case I describe in 2) so can i do something like this hadoop fs -getmerge /output/dir/on/hdfs desired/dir/in/hdfs ? 2) Just to make sure I am going on the right path...my end use case is to use hive or hbase to create a database off these csv files. Is there an easy way for hive to read /user/test/many sub directories/with one csv file in each into a table? Thank you! On Sat, Feb 14, 2015 at 12:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new *directory *that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Re: Why are there different parts in my CSV?
Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new *directory *that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Strategy to automatically configure spark workers env params in standalone mode
We are planning to use varying servers spec (32 GB, 64GB, 244GB RAM or even higher and varying cores) for an standalone deployment of spark but we do not know the spec of the server ahead of time and we need to script up some logic that will run on the server on boot and automatically set the following params on the server based on what it reads from OS about cores and memory SPARK_WORKER_CORES SPARK_WORKER_MEMORY SPARK_WORKER_INSTANCES What could such script the logic to be based on the memory size and number of cores seen by this script? In other words, what are the recommend rule of thumps to divide up a server (specially for larger rams) without knowing about the spark application and data size ahead of time? Thanks, Mike
Re: SQLContext.applySchema strictness
AFAIK, this is the expected behavior. You have to make sure that the schema matches the row. It won't give any error when you apply the schema as it doesn't validate the nature of data. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: failing GraphX application ('GC overhead limit exceeded', 'Lost executor', 'Connection refused', etc.)
Oops! I forgot to excerpt the errors and warnings from that file: 15/02/12 08:02:03 ERROR TaskSchedulerImpl: Lost executor 4 on compute-0-3.wright: remote Akka client disassociated 15/02/12 08:03:00 WARN TaskSetManager: Lost task 1.0 in stage 28.0 (TID 37, compute-0-1.wright): java.lang.OutOfMemoryError: GC overhead limit exceeded 15/02/12 08:05:06 WARN TaskSetManager: Lost task 0.0 in stage 31.1 (TID 48, compute-0-2.wright): FetchFailed(BlockManagerId(0, wright.cs.umass.edu, 60837), shuffleId=0, mapId=1, reduceId=1, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to wright.cs.umass.edu/128.119.241.146:60837 Caused by: java.io.IOException: Failed to connect to wright.cs.umass.edu/128.119.241.146:60837 Caused by: java.net.ConnectException: Connection refused: wright.cs.umass.edu/128.119.241.146:60837 I see Lost executor messages on all nodes, not just the 0-3 one, so it's not node-specific. Any ideas about how to fix this? Thanks again, matt On Feb 12, 2015, at 10:46 AM, Matthew Cornell wrote: Hi Folks, I'm running a five-step path following-algorithm on a movie graph with 120K verticies and 400K edges. The graph has vertices for actors, directors, movies, users, and user ratings, and my Scala code is walking the path rating movie rating user rating. There are 75K rating nodes and each has ~100 edges. My program iterates over each path item, calling aggregateMessages() then joinVertices() each time, and then processing that result on the next iteration. The program never finishes the second 'rating' step, which makes sense as, IIUC from my back-of-the-napkin estimate, the intermediate result would have ~4B active vertices. Spark is version 1.2.0 and running in standalone mode on a small cluster of five hosts: four compute nodes and a head node where the computes have 4 cores and 32GB RAM each, and the head has 32 cores and 128GB RAM. After restarting Spark just now, the Master web UI shows 15 workers (5 dead), two per node, with cores and memory listed as 32 (0 Used) and 125.0 GB (0.0 B Used) on the two head node workers and 4 (0 Used) and 30.5 GB (0.0 B Used) for the 8 workers running on the compute nodes. (Note: I don't understand why it's configured to run two workers per node.) The small Spark example programs run to completion. I've listed the console output at http://pastebin.com/DPECKgQ9 (I'm running in spark-shell). I hope you can provide some advice on things to try next (e.g., configuration vars). My guess is the cluster is running out of memory, though I think it has adequate aggregate ram to handle this app. Thanks very much -- matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why are there different parts in my CSV?
Hello Akhil, thank you for your continued help! 1) So, if I can write it in programitically after every batch, then technically I should be able to have just the csv files in one directory. However, can the /desired/output/file.txt be in hdfs? If it is only local, I am not sure if it will help me for my use case I describe in 2) so can i do something like this hadoop fs -getmerge /output/dir/on/hdfs desired/dir/in/hdfs ? 2) Just to make sure I am going on the right path...my end use case is to use hive or hbase to create a database off these csv files. Is there an easy way for hive to read /user/test/many sub directories/with one csv file in each into a table? Thank you! On Sat, Feb 14, 2015 at 12:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new *directory *that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Spark Web UI Doesn't Open in Yarn-Client Mode
Hi, I am running 3 mode spark cluster on EMR. While running job I see 1 executor running? Does that mean only 1 of the node is being used? ( Seems from Spark Documentation on default mode (LOCAL). When I switch to yarn-client mode the Spark Web UI doesn't open. How to view the job running details now ?
Re: Why are there different parts in my CSV?
Thanks Akhil for the link. Is there a reason why there is a new directory created for each batch? Is this a format that is easily readable by other applications such as hive/impala? On Sat, Feb 14, 2015 at 1:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can directly write to hbase with Spark. Here's and example for doing that https://issues.apache.org/jira/browse/SPARK-944 Thanks Best Regards On Sat, Feb 14, 2015 at 2:55 PM, Su She suhsheka...@gmail.com wrote: Hello Akhil, thank you for your continued help! 1) So, if I can write it in programitically after every batch, then technically I should be able to have just the csv files in one directory. However, can the /desired/output/file.txt be in hdfs? If it is only local, I am not sure if it will help me for my use case I describe in 2) so can i do something like this hadoop fs -getmerge /output/dir/on/hdfs desired/dir/in/hdfs ? 2) Just to make sure I am going on the right path...my end use case is to use hive or hbase to create a database off these csv files. Is there an easy way for hive to read /user/test/many sub directories/with one csv file in each into a table? Thank you! On Sat, Feb 14, 2015 at 12:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new *directory *that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Re: Why are there different parts in my CSV?
Keep in mind that if you repartition to 1 partition, you are only using 1 task to write the output, and potentially only 1 task to compute some parent RDDs. You lose parallelism. The files-in-a-directory output scheme is standard for Hadoop and for a reason. Therefore I would consider separating this concern and merging the files afterwards if you need to. On Sat, Feb 14, 2015 at 8:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new directory that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkStreaming Low Performance
I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: Is Ubuntu server or desktop better for spark cluster
I don't think this is very specific to Spark. Spark is not a desktop user application. While it is perfectly possible to run it on such a distro, I would think a server distro is more appropriate. On Sat, Feb 14, 2015 at 3:05 PM, Joanne Contact joannenetw...@gmail.com wrote: Hi gurus, I am trying to install a real linux machine(not VM) where i will install spark also Hadoop. I plan on learning the clusters. I found Ubuntu has desktop and server versions. Do it matter? Thanks!! J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming Low Performance
I see. I'd really benchmark how the parsing performs outside Spark (in a tight loop or something). If *that* is slow, you know it's the parsing. If not, it's not the parsing. Another thing you want to look at is CPU usage. If the actual parsing really is the bottleneck, you should see very high CPU utilization. If not, it's not the parsing per se but rather the ability to feed the messages to the parsing library. ᐧ On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: Spark Web UI Doesn't Open in Yarn-Client Mode
on yarn you need to first go to the resource manager UI, find your job, and click the link for the UI there. From: Puneet Kumar Ojhamailto:puneet.ku...@pubmatic.com Sent: ?Saturday?, ?February? ?14?, ?2015 ?5?:?25? ?AM To: user@spark.apache.orgmailto:user@spark.apache.org Hi, I am running 3 mode spark cluster on EMR. While running job I see 1 executor running? Does that mean only 1 of the node is being used? ( Seems from Spark Documentation on default mode (LOCAL). When I switch to yarn-client mode the Spark Web UI doesn't open. How to view the job running details now ?
Re: SparkException: Task not serializable - Jackson Json
to get past this you can move the mapper creation code down into the closure. its then created on the worker node so it doesnt need to be serialized. // Parse it into a specific case class. We use flatMap to handle errors // by returning an empty list (None) if we encounter an issue and a // list with one element if everything is ok (Some(_)). val result = input.flatMap(record = { try { val mapper = new ObjectMapper with ScalaObjectMapper mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) mapper.registerModule(DefaultScalaModule) Some(mapper.readValue(record, classOf[Company])) } catch { case e: Exception = None } }) result.map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile) } } BUT for more efficiency look into creating the mapper in a *mapPartitions* iterator, which means it'll be created on the worker node but only per partition and not for every row like above. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkException-Task-not-serializable-Jackson-Json-tp21347p21655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming Low Performance
Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
Huh, that would come to 6.5ms per one JSON. That does feel like a lot but if your JSON file is big enough, I guess you could get that sort of processing time. Jackson is more or less the most efficient JSON parser out there, so unless the Scala API is somehow affecting it, I don't see any better way. If you only need to read parts of the JSON, you could look into exploiting Jackson's stream parsing API http://wiki.fasterxml.com/JacksonStreamingApi . I guess the good news is you can throw machines at it. You could also look into other serialization frameworks. ᐧ On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks again! Its with the parser only, just tried the parser https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And it took me 52 Sec to process 8k json records. Not sure if there's an efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it will be much faster, but i need that in SparkStreaming. Thanks Best Regards On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote: I see. I'd really benchmark how the parsing performs outside Spark (in a tight loop or something). If *that* is slow, you know it's the parsing. If not, it's not the parsing. Another thing you want to look at is CPU usage. If the actual parsing really is the bottleneck, you should see very high CPU utilization. If not, it's not the parsing per se but rather the ability to feed the messages to the parsing library. ᐧ On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Is Ubuntu server or desktop better for spark cluster
Hi gurus, I am trying to install a real linux machine(not VM) where i will install spark also Hadoop. I plan on learning the clusters. I found Ubuntu has desktop and server versions. Do it matter? Thanks!! J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming Low Performance
(adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
Thanks again! Its with the parser only, just tried the parser https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And it took me 52 Sec to process 8k json records. Not sure if there's an efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it will be much faster, but i need that in SparkStreaming. Thanks Best Regards On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote: I see. I'd really benchmark how the parsing performs outside Spark (in a tight loop or something). If *that* is slow, you know it's the parsing. If not, it's not the parsing. Another thing you want to look at is CPU usage. If the actual parsing really is the bottleneck, you should see very high CPU utilization. If not, it's not the parsing per se but rather the ability to feed the messages to the parsing library. ᐧ On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: Is Ubuntu server or desktop better for spark cluster
For a beginner Ubuntu Desktop is recommended as it includes a GUI and is easier to install. Also referServerFaq - Community Help Wiki | | | | | | | | | ServerFaq - Community Help WikiFrequently Asked Questions about the Ubuntu Server Edition This Frequently Asked Questions document is intended to help system administrators and users of the Ubuntu Server edition. | | | | View on help.ubuntu.com | Preview by Yahoo | | | | | From: Joanne Contact joannenetw...@gmail.com To: user@spark.apache.org user@spark.apache.org Sent: Saturday, February 14, 2015 7:05 AM Subject: Is Ubuntu server or desktop better for spark cluster Hi gurus, I am trying to install a real linux machine(not VM) where i will install spark also Hadoop. I plan on learning the clusters. I found Ubuntu has desktop and server versions. Do it matter? Thanks!! J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL and star schema
Yes. Though for good performance it is usually important to make sure that you have statistics for the smaller dimension tables. Today that can be done by creating them in the hive metastore and running ANALYZE TABLE table COMPUTE STATISTICS noscan. In Spark 1.3 this will happen automatically when you create a table using the datasources API. CREATE TABLE myTable USING parquet OPTIONS (path /...) On Fri, Feb 13, 2015 at 2:06 AM, Paolo Platter paolo.plat...@agilelab.it wrote: Hi, is SparkSQL + Parquet suitable to replicate a star schema ? Paolo Platter AgileLab CTO
Re: Shuffle write increases in spark 1.2
I double check the 1.2 feature list and found out that the new sort-based shuffle manager has nothing to do with HashPartitioner :- Sorry for the misinformation. In another hand. This may explain increase in shuffle spill as a side effect of the new shuffle manager, let me revert spark.shuffle.manager to hash and see if it make things better (or worse, as the benchmark in https://issues.apache.org/jira/browse/SPARK-3280 indicates) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle write increases in spark 1.2
Same problem here, shuffle write increased from 10G to over 64G, since I'm running on amazon EC2 this always cause temporary folder to consume all the disk space. Still looking for a solution. BTW, the 64G shuffle write is encountered on shuffling a pairRDD with HashPartitioner, so its not related to Spark 1.2.0's new features Yours Peng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21656.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQLContext.applySchema strictness
Doing runtime type checking is very expensive, so we only do it when necessary (i.e. you perform an operation like adding two columns together) On Sat, Feb 14, 2015 at 2:19 AM, nitin nitin2go...@gmail.com wrote: AFAIK, this is the expected behavior. You have to make sure that the schema matches the row. It won't give any error when you apply the schema as it doesn't validate the nature of data. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Build spark failed with maven
Hi, this was not reproduced for me, what kind of jdk are you using for the zinc server ? Regards, Olivier. 2015-02-11 5:08 GMT+01:00 Yi Tian tianyi.asiai...@gmail.com: Hi, all I got an ERROR when I build spark master branch with maven (commit: 2d1e916730492f5d61b97da6c483d3223ca44315) [INFO] [INFO] [INFO] Building Spark Project Catalyst 1.3.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-versions) @ spark-catalyst_2.10 --- [INFO] [INFO] --- build-helper-maven-plugin:1.8:add-source (add-scala-sources) @ spark-catalyst_2.10 --- [INFO] Source directory: /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala added. [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark-catalyst_2.10 --- [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ spark-catalyst_2.10 --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/resources [INFO] Copying 3 resources [INFO] [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-catalyst_2.10 --- [INFO] Using zinc server for incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [info] Compiling 69 Scala sources and 3 Java sources to /Users/tianyi/github/community/apache-spark/sql/catalyst/target/scala-2.10/classes...[error] /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala:314: polymorphic expression cannot be instantiated to expected type; [error] found : [T(in method apply)]org.apache.spark.sql.catalyst.dsl.ScalaUdfBuilder[T(in method apply)] [error] required: org.apache.spark.sql.catalyst.dsl.package.ScalaUdfBuilder[T(in method functionToUdfBuilder)] [error] implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, T]): ScalaUdfBuilder[T] = ScalaUdfBuilder(func) Any suggestion?
Re: SQLContext.applySchema strictness
Would it make sense to add an optional validate parameter to applySchema() which defaults to False, both to give users the option to check the schema immediately and to make the default behavior clearer? On Sat Feb 14 2015 at 9:18:59 AM Michael Armbrust mich...@databricks.com wrote: Doing runtime type checking is very expensive, so we only do it when necessary (i.e. you perform an operation like adding two columns together) On Sat, Feb 14, 2015 at 2:19 AM, nitin nitin2go...@gmail.com wrote: AFAIK, this is the expected behavior. You have to make sure that the schema matches the row. It won't give any error when you apply the schema as it doesn't validate the nature of data. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why are there different parts in my CSV?
Thanks Sean and Akhil! I will take out the repartition(1). Please let me know if I understood this correctly, Spark Streamingwrites data like this: foo-1001.csv/part -x, part-x foo-1002.csv/part -x, part-x When I see this on Hue, the csv's appear to me as *directories*, but if I understand correctly, they will appear as csv *files* to other hadoop ecosystem tools? And, if I understand Tathagata's answer correctly, other hadoop based ecosystems, such as Hive, will be able to create a table based of the multiple foo-10x.csv directories? Thank you, I really appreciate the help! On Sat, Feb 14, 2015 at 3:20 AM, Sean Owen so...@cloudera.com wrote: Keep in mind that if you repartition to 1 partition, you are only using 1 task to write the output, and potentially only 1 task to compute some parent RDDs. You lose parallelism. The files-in-a-directory output scheme is standard for Hadoop and for a reason. Therefore I would consider separating this concern and merging the files afterwards if you need to. On Sat, Feb 14, 2015 at 8:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new directory that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Re: Why are there different parts in my CSV?
Okay, got it, thanks for the help Sean! On Sat, Feb 14, 2015 at 1:08 PM, Sean Owen so...@cloudera.com wrote: No, they appear as directories + files to everything. Lots of tools are used to taking an input that is a directory of part files though. You can certainly point MR, Hive, etc at a directory of these files. On Sat, Feb 14, 2015 at 9:05 PM, Su She suhsheka...@gmail.com wrote: Thanks Sean and Akhil! I will take out the repartition(1). Please let me know if I understood this correctly, Spark Streamingwrites data like this: foo-1001.csv/part -x, part-x foo-1002.csv/part -x, part-x When I see this on Hue, the csv's appear to me as directories, but if I understand correctly, they will appear as csv files to other hadoop ecosystem tools? And, if I understand Tathagata's answer correctly, other hadoop based ecosystems, such as Hive, will be able to create a table based of the multiple foo-10x.csv directories? Thank you, I really appreciate the help! On Sat, Feb 14, 2015 at 3:20 AM, Sean Owen so...@cloudera.com wrote: Keep in mind that if you repartition to 1 partition, you are only using 1 task to write the output, and potentially only 1 task to compute some parent RDDs. You lose parallelism. The files-in-a-directory output scheme is standard for Hadoop and for a reason. Therefore I would consider separating this concern and merging the files afterwards if you need to. On Sat, Feb 14, 2015 at 8:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new directory that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Re: Why are there different parts in my CSV?
No, they appear as directories + files to everything. Lots of tools are used to taking an input that is a directory of part files though. You can certainly point MR, Hive, etc at a directory of these files. On Sat, Feb 14, 2015 at 9:05 PM, Su She suhsheka...@gmail.com wrote: Thanks Sean and Akhil! I will take out the repartition(1). Please let me know if I understood this correctly, Spark Streamingwrites data like this: foo-1001.csv/part -x, part-x foo-1002.csv/part -x, part-x When I see this on Hue, the csv's appear to me as directories, but if I understand correctly, they will appear as csv files to other hadoop ecosystem tools? And, if I understand Tathagata's answer correctly, other hadoop based ecosystems, such as Hive, will be able to create a table based of the multiple foo-10x.csv directories? Thank you, I really appreciate the help! On Sat, Feb 14, 2015 at 3:20 AM, Sean Owen so...@cloudera.com wrote: Keep in mind that if you repartition to 1 partition, you are only using 1 task to write the output, and potentially only 1 task to compute some parent RDDs. You lose parallelism. The files-in-a-directory output scheme is standard for Hadoop and for a reason. Therefore I would consider separating this concern and merging the files afterwards if you need to. On Sat, Feb 14, 2015 at 8:39 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Simplest way would be to merge the output files at the end of your job like: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt If you want to do it pro grammatically, then you can use the FileUtil.copyMerge API . like: FileUtil.copyMerge(FileSystem of source(hdfs), /output-location, FileSystem of destination(hdfs), Path to the merged files /merged-ouput, true(to delete the original dir),null) Thanks Best Regards On Sat, Feb 14, 2015 at 2:18 AM, Su She suhsheka...@gmail.com wrote: Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new directory that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org