Re: Choice of IDE for Spark
We use Jupyter on Hadoop https://jupyterhub-on-hadoop.readthedocs.io/en/latest/ for developing spark jobs directly inside the Cluster it should run. With that you have direct access to yarn and hdfs (fully secured) without any migration steps. You can control the size of your Jupyter yarn container and of course your spark session. Regards, Christian \ Original-Nachricht Am 2. Okt. 2021, 01:21, Holden Karau schrieb: > > > > Personally I like Jupyter notebooks for my interactive work and then once > I’ve done my exploration I switch back to emacs with either scala-metals or > Python mode. > > > > > I think the main takeaway is: do what feels best for you, there is no one > true way to develop in Spark. > > > > > On Fri, Oct 1, 2021 at 1:28 AM Mich Talebzadeh > <[mich.talebza...@gmail.com][mich.talebzadeh_gmail.com]> wrote: > > > > Thanks guys for your comments. > > > > > > > > > > I agree with you Florian that opening a terminal say in VSC allows you to > > run a shell script (an sh file) to submit your spark code, however, this > > really makes sense if your IDE is running on a Linux host submitting a job > > to a Kubernetes cluster or YARN cluster. > > > > > > > > > > For Python, I will go with PyCharm which is specific to the Python world. > > With Spark, I have used IntelliJ with Spark plug in on MAC for development > > work. Then created a JAR file, gzipped the whole project and scped to an > > IBM sandbox, untarred it and ran it with a pre-prepared shell with > > environment plugin for dev, test, staging etc. > > > > > > > > > > IDE is also useful for looking at csv, tsv type files or creating json from > > one form to another. For json validation,especially if the file is too > > large, you may have restriction loading the file to web json validator > > because of the risk of proprietary data being exposed. There is a tool > > called[ jq][jq] (a lightweight and flexible command-line JSON processor), > > that comes pretty handy to validate json. Download and install it on OS and > > run it as > > > > > > > > > > zcat .tgz \| jq > > > > > > > > > > That will validate the whole tarred and gzipped json file. Otherwise most > > of these IDE tools come with add-on plugins, for various needs. My > > preference would be to use the best available IDE for the job. VSC I would > > consider as a general purpose tool. If all fails, one can always use OS > > stuff like vi, vim, sed, awk etc 樂 > > > > > > > > > > > > > > > > Cheers > > > > > > > > > > ![uc_export_download_id_1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ_revid_0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ][][view > > my Linkedin profile][] > > > > **Disclaimer:** Use it at your own risk.Any and all responsibility for any > > loss, damage or destruction of data or any other property which may arise > > from relying on this email's technical content is explicitly disclaimed. > > The author will in no case be liable for any monetary damages arising from > > such loss, damage or destruction. > > > > > > > > > > > > > > > > On Fri, 1 Oct 2021 at 06:55, Florian CASTELAIN > > <[florian.castel...@redlab.io][Florian.CASTELAIN_redlab.io]> wrote: > > > > > > > Hello. > > > > > > > > > > > > > > > Any "evolved" code editor allows you to create tasks (or builds, or > > > whatever they are called in the IDE you chose). If you do not find > > > anything that packages by default all you need, you could just create > > > your own tasks. > > > > > > *For yarn, one needs to open a terminal and submit from there.* > > > > > > > > > > > > > > > > > > You can create task(s) that launch your yarn commands. > > > > > > > > > > > > > > > > > > > > > > > > *With VSC, you get stuff for working with json files but I am not sure > > > with a plugin for Python* > > > > > > > > > > > > > > > > > > In your json task configuration, you can launch whatever you want: > > > python, shell. I bet you could launch your favorite video game (just make > > > a task called "let's have a break" ) > > > > > > > > > > > > > > > Just to say,
Re: Bechmarks on Spark running on Yarn versus Spark on K8s
Does anyone know where the data for this benchmark was stored? Spark on YARN gets performance because of data locality via co-allocation of YARN Nodemanager and HDFS Datanode, not because of the job scheduler, right? Regards, z0ltrix \ Original-Nachricht Am 5. Juli 2021, 21:27, Madaditya .Maddy schrieb: > > > > I came across an article that benchmarked spark on k8s vs yarn by > Datamechanics. > > > > > Link : > https://www.datamechanics.co/blog-post/apache-spark-performance-benchmarks-show-kubernetes-has-caught-up-with-yarn > > > > > \-Regards > > Aditya > > > > > On Mon, Jul 5, 2021, 23:49 Mich Talebzadeh > <[mich.talebza...@gmail.com][mich.talebzadeh_gmail.com]> wrote: > > > > Thanks Yuri. Those are very valid points. > > > > > > > > > > Let me clarify my point. Let us assume that we will be using Yarn versus > > K8s doing the same job. Spark-submit will use Yarn at first instance and > > will then switch to using k8s for the same task. > > > > > > > > > > 1. Have there been such benchmarks? > > 2. When should I choose PaaS versus k8s for example for small to medium > > size jobs > > 3. I can see the flexibility of running Spark on Dataproc, then some may > > argue that k8s are the way forward > > 4. Bear in mind that I am only considering Spark. For example for Kafka > > and Zookeeper we opt for dockers as they do a single function. > > > > > > > > > > Cheers, > > > > > > > > > > Mich > > > > > > > > > > ![uc_export_download_id_1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ_revid_0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ][][view > > my Linkedin profile][] > > > > **Disclaimer:** Use it at your own risk.Any and all responsibility for any > > loss, damage or destruction of data or any other property which may arise > > from relying on this email's technical content is explicitly disclaimed. > > The author will in no case be liable for any monetary damages arising from > > such loss, damage or destruction. > > > > > > > > > > > > > > > > On Mon, 5 Jul 2021 at 19:06, "Yuri Oleynikov (יורי אולייניקוב)" > > <[yur...@gmail.com][yurkao_gmail.com]> wrote: > > > > > > > Not a big expert on Spark, but I’m not really understand how you are > > > going to compare and what? Reading-writing to and from Hdfs? How does it > > > related to yarn and k8s… these are recourse managers (YARN yet another > > > resource manager) : what and how much to allocate and when… (cpu, ram). > > > > > > Local Disk spilling? Depends on disk throughput… > > > > > > So what you are going to measure? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards > > > > > > > > > > > > > > > > On 5 Jul 2021, at 20:43, Mich Talebzadeh > > > > <[mich.talebza...@gmail.com][mich.talebzadeh_gmail.com]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I was curious to know if there are benchmarks around on comparison > > > > between Spark on Yarn compared to Kubernetes. > > > > > > > > > > > > > > > > > > > > This question arose because traditionally in Google Cloud we have been > > > > using Spark on Dataproc clusters.[ Dataproc][Dataproc] provides Spark, > > > > Hadoop plus others (optional install) for data and analytic processing. > > > > It is PaaS > > > > > > > > > > > > > > > > > > > > Now they have GKE clusters as well and also introduced [Apache Spark > > > > with Cloud Dataproc on Kubernetes][] which allows one to submit Spark > > > > jobs to k8s using Dataproc stub as a platform to submit the job as > > > > below from cloud console or local > > > > > > > > > > > > > > > > > > > > gcloud dataproc jobs submit pyspark --cluster="dataproc-for-gke" > > > > gs://bucket/testme.py --region="europe-west2" --py-files > > > > gs://bucket/DSBQ.zip > > > > Job \[e5fc19b62cf744f0b13f3e6d9cc66c19\] submitted. > > > > Waiting for job output... > > > > > > > > > > > > > > > > > > > > > > > > At the moment it is a struggle to see what merits using k8s instead of > > > > dataproc bar notebooks etc. Actually there is not much literature > > > > around with PySpark on k8s. > > > > > > > > > > > > > > > > > > > > > > > > For me Spark on bare metal is the preferred option as I cannot see how > > > > one can pigeon hole Spark into a container and make it performant but I > > > > may be totally wrong. > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > ![uc_export_download_id_1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ_revid_0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ][][view > > > > my Linkedin profile][] > > > > > > > > **Disclaimer:** Use it at your own risk.Any and all responsibility for > > > > any loss, damage or destruction of data or any other property which may > > > > arise from relying on this email's technical content is explicitly > > > > disclaimed. The author will in no case be liable for any monetary > > > > damages arising from such loss, damage or
unsubscribe
RE: [Spark SQL]: Slow insertInto overwrite if target table has many partitions
Hi, How do I get the filename from textFileStream Using streaming. Thanks a mill Standard Bank email disclaimer and confidentiality note Please go to www.standardbank.co.za/site/homepage/emaildisclaimer.html to read our email disclaimer and confidentiality note. Kindly email disclai...@standardbank.co.za (no content or subject line necessary) if you cannot view that page and we will email our email disclaimer and confidentiality note to you.
RE: Dose pyspark supports python3.6?
Dear Spark users I have been asked to provide a presentation / business case as to why to use spark and java as ingestion tool for HDFS and HIVE And why to move away from an etl tool. Could you be so kind as to provide with some pros and cons to this. I have the following : Pros: In house build – code can be changes on the fly to suite business need. Software is free Can out of the box run on all nodes Will support all Apache based software. Fast deu to in memory processing Spark UI can visualise execution Support checkpoint data loads Support echama regesty for custom schema and inference. Support Yarn execution Mlibs can be used in need. Data linage support deu to spar usage. Cons Skills needed to maintain and build In memory cabibility can become bottleneck if not managed No ETL gui. Maybe point be to an article if you have one. Thanks a mill. Christian Standard Bank email disclaimer and confidentiality note Please go to www.standardbank.co.za/site/homepage/emaildisclaimer.html to read our email disclaimer and confidentiality note. Kindly email disclai...@standardbank.co.za (no content or subject line necessary) if you cannot view that page and we will email our email disclaimer and confidentiality note to you.
RE: Is Spark suited for this use case?
Hi, We basically have the same scenario but worldwide as we have bigger Datasets we use OGG --> local --> Sqoop Into Hadoop. By all means you can have spark reading the oracle tables and then do some changes to data in need which will not be done on scoop qry. Ie fraudulent detection on transaction records. But some time the simplest way is the best. Unless you need a change or need more then I would advise not using another hop. I would rather move away from files as OGG can do files and direct table loading then sqoop for the rest. Simpler is better. Hope this helps. C. From: Saravanan Thirumalai [mailto:saravanan.thiruma...@gmail.com] Sent: Monday, 16 October 2017 4:29 AM To: user@spark.apache.org Subject: Is Spark suited for this use case? We are an Investment firm and have a MDM platform in oracle at a vendor location and use Oracle Golden Gate to replicat data to our data center for reporting needs. Our data is not big data (total size 6 TB including 2 TB of archive data). Moreover our data doesn't get updated often, nightly once (around 50 MB) and some correction transactions during the day (<10 MB). We don't have external users and hence data doesn't grow real-time like e-commerce. When we replicate data from source to target, we transfer data through files. So, if there are DML operations (corrections) during day time on a source table, the corresponding file would have probably 100 lines of table data that needs to be loaded into the target database. Due to low volume of data we designed this through Informatica and this works in less than 2-5 minutes. Can Spark be used in this case or would it be an overkill of technology use? Standard Bank email disclaimer and confidentiality note Please go to www.standardbank.co.za/site/homepage/emaildisclaimer.html to read our email disclaimer and confidentiality note. Kindly email disclai...@standardbank.co.za (no content or subject line necessary) if you cannot view that page and we will email our email disclaimer and confidentiality note to you.
[ANNOUNCE] Apache Bahir 2.1.0 Released
The Apache Bahir community is pleased to announce the release of Apache Bahir 2.1.0 which provides the following extensions for Apache Spark 2.1.0: - Akka Streaming - MQTT Streaming - MQTT Structured Streaming - Twitter Streaming - ZeroMQ Streaming For more information about Apache Bahir and to download the latest release go to: http://bahir.apache.org The Apache Bahir streaming connectors are also available at: https://spark-packages.org/?q=bahir --- Best regards, Christian Kadner
[ANNOUNCE] Apache Bahir 2.0.2
The Apache Bahir PMC approved the release of Apache Bahir 2.0.2 which provides the following extensions for Apache Spark 2.0.2: - Akka Streaming - MQTT Streaming - MQTT Structured Streaming - Twitter Streaming - ZeroMQ Streaming For more information about Apache Bahir and to download the latest release go to: http://bahir.apache.org The Apache Bahir streaming connectors are also available at: https://spark-packages.org/?q=bahir --- Best regards, Christian Kadner - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark SQL Nested Array of JSON with empty field
If that's your JSON file, then the first problem is that it's incorrectly formatted. Apart from that you can just read the JSON into a DataFrame with sqlContext.read.json() and then select directly on the DataFrame without having to register a temporary table: jsonDF.select("firstname", "address.state", ...). Works for me (with a properly formatted JSON document). To make sure that your JSON is read correctly, check jsonDF.printSchema. If there is an entry with corrupt records (or similar), you know there's a problem with the JSON structure. On 3 June 2016 at 21:31, Jerry Wongwrote: > Hi, > > I met a problem of empty field in the nested JSON file with Spark SQL. For > instance, > There are two lines of JSON file as follows, > > { > "firstname": "Jack", > "lastname": "Nelson", > "address": { > "state": "New York", > "city": "New York" > } > }{ > "firstname": "Landy", > "middlename": "Ken", > "lastname": "Yong", > "address": { > "state": "California", > "city": "Los Angles" > } > } > > I use Spark SQL to get the files like, > val row = sqlContext.sql("SELECT firstname, middlename, lastname, > address.state, address.city FROM jsontable") > The compile will tell me the error of line1: no "middlename". > How do I handle this case in the SQL sql? > > Many thanks in advance! > Jerry > > >
RE: Spark Streaming heap space out of memory
While it has heap space, batches run well below 15 seconds. Once it starts to run out of space, processing time takes about 1.5 minutes. Scheduling delay is around 4 minutes and total delay around 5.5 minutes. I usually shut it down at that point. The number of stages (and pending stages) does seem to be quite high and increases over time. 4584foreachRDD at HDFSPersistence.java:52 2016/05/30 16:23:52 1.9 min 36/36 (4964 skipped) 285/285 (28026 skipped) 4586transformToPair at SampleCalculator.java:88 2016/05/30 16:25:02 0.2 s 1/1 4/4 4585(Unknown Stage Name) 2016/05/30 16:23:52 1.2 min 1/1 1/1 4582(Unknown Stage Name) 2016/05/30 16:21:51 48 s 1/1 (4063 skipped) 12/12 (22716 skipped) 4583(Unknown Stage Name) 2016/05/30 16:21:51 48 s 1/1 1/1 4580(Unknown Stage Name) 2016/05/30 16:16:38 4.0 min 36/36 (4879 skipped)285/285 (27546 skipped) 4581(Unknown Stage Name) 2016/05/30 16:16:38 0.1 s1/1 4/4 4579(Unknown Stage Name) 2016/05/30 16:15:53 45 s 1/1 1/1 4578(Unknown Stage Name) 2016/05/30 16:14:38 1.3 min 1/1 (3993 skipped) 12/12 (22326 skipped) 4577(Unknown Stage Name) 2016/05/30 16:14:37 0.8 s1/1 1/1Is this what you mean by pending stages? I have taken a few heap dumps but I’m not sure what I am looking at for the problematic classes. From: Shahbaz [mailto:shahzadh...@gmail.com] Sent: 2016, May, 30 3:25 PM To: Dancuart, Christian Cc: user Subject: Re: Spark Streaming heap space out of memory Hi Christian, * What is the processing time of each of your Batch,is it exceeding 15 seconds. * How many jobs are queued. * Can you take a heap dump and see which objects are occupying the heap. Regards, Shahbaz On Tue, May 31, 2016 at 12:21 AM, christian.dancu...@rbc.com<mailto:christian.dancu...@rbc.com> <christian.dancu...@rbc.com<mailto:christian.dancu...@rbc.com>> wrote: Hi All, We have a spark streaming v1.4/java 8 application that slows down and eventually runs out of heap space. The less driver memory, the faster it happens. Appended is our spark configuration and a snapshot of the of heap taken using jmap on the driver process. The RDDInfo, $colon$colon and [C objects keep growing as we observe. We also tried to use G1GC, but it acts the same. Our dependency graph contains multiple updateStateByKey() calls. For each, we explicitly set the checkpoint interval to 240 seconds. We have our batch interval set to 15 seconds; with no delays at the start of the process. Spark configuration (Spark Driver Memory: 6GB, Spark Executor Memory: 2GB): spark.streaming.minRememberDuration=180s spark.ui.showConsoleProgress=false spark.streaming.receiver.writeAheadLog.enable=true spark.streaming.unpersist=true spark.streaming.stopGracefullyOnShutdown=true spark.streaming.ui.retainedBatches=10 spark.ui.retainedJobs=10 spark.ui.retainedStages=10 spark.worker.ui.retainedExecutors=10 spark.worker.ui.retainedDrivers=10 spark.sql.ui.retainedExecutions=10 spark.serializer=org.apache.spark.serializer.KryoSerializer spark.kryoserializer.buffer.max=128m num #instances #bytes class name -- 1: 8828200 565004800 org.apache.spark.storage.RDDInfo 2: 20794893 499077432 scala.collection.immutable.$colon$colon 3: 9646097 459928736 [C 4: 9644398 231465552 java.lang.String 5: 12760625 20417 java.lang.Integer 6: 21326 98632 [B 7:556959 44661232 [Lscala.collection.mutable.HashEntry; 8: 1179788 37753216 java.util.concurrent.ConcurrentHashMap$Node 9: 1169264 37416448 java.util.Hashtable$Entry 10:552707 30951592 org.apache.spark.scheduler.StageInfo 11:367107 23084712 [Ljava.lang.Object; 12:556948 22277920 scala.collection.mutable.HashMap 13: 2787 22145568 [Ljava.util.concurrent.ConcurrentHashMap$Node; 14:116997 12167688 org.apache.spark.executor.TaskMetrics 15:3604258650200 java.util.concurrent.LinkedBlockingQueue$Node 16:3604178650008 org.apache.spark.deploy.history.yarn.HandleSparkEvent 17: 83328478088 [Ljava.util.Hashtable$Entry; 18:3510618425464 scala.collection.mutable.ArrayBuffer 19:1169638421336 org.apache.spark.scheduler.TaskInfo 20:4461367138176 scala.Some 21:2119685087232 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 22:1169634678520 org.apache.spark.scheduler.SparkListenerTaskEnd 23:1076794307160 org.apache.spark.executor.ShuffleWriteMetrics 24: 72
Re: problem about RDD map and then saveAsTextFile
Internally, saveAsTextFile uses saveAsHadoopFile: https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala . The final bit in the method first creates the output path and then saves the data set. However, if there is an issue with the saveAsHadoopDataset call, the path still remains. Technically, we could add an exception-handling section that removes the path in case of problems. I think that would be a nice way of making sure that we don’t litter the FS with empty files and directories in case of exceptions. So, to your question: parameter to saveAsTextFile is a path (not a file) and it has to be empty. Spark automatically names the files PART-N with N the partition number. This follows immediately from the partitioning scheme of the RDD itself. The real problem is that there is a problem with the calculation. You might want to fix that first. Just post the relevant bits from the log. Hi all: I’ve tried to execute something as below: result.map(transform).saveAsTextFile(hdfsAddress) Result is a RDD caluculated from mlilib algorithm. I submit this to yarn, and after two attempts , the application failed. But the exception in log is very missleading. It said hdfsAddress already exits. Actually, the first attempt log showed that the exception is from the calculation of result. Though the attempt failed it created the file. And then attempt 2 began with exception ‘file already exists’. Why was RDD calculation before already failed but also the file created? That’s not so good I think.
REST-API for Killing a Streaming Application
Hello Spark Streaming Gurus, for better automation I need to manage my Spark Streaming Applications remotely. These applications read from Kafka and therefore have a receiver job and are started via spark-submit. For now I have only found a REST-API for killing Spark applications remotely, but that only works if the Spark application runs on a Standalone Spark Server. Q1: Is it correct that the Spark WebUI does not provide any way of killing an application via a REST call? Should I file a JIRA for this? My second question/ problem is around the fact that even for the Standalone Spark Server I am unable to get timely/complete shutdowns: I see long delays until the spark-submit OS process terminates after the kill has been sent. This delay happens even though spark-submit immediately recognizes the kill on stdout: org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: KILLED But then hangs and to me it looks like the Spark receiver is never shut down and may even be running for ever on the Spark Standalone server: the application is marked in status "KILLED", but the link "Application Detail UI" still works and shows that "Streaming job running receiver 0 (Job 2)" is still running. Note 1: when I kill the same spark-submit job using Ctrl-C, the application immediately stops as expected. Including the shutdown of Kafka Receiver. On the Spark Standalone server the application is then in status "FINISHED" and the link "Application Detail UI" takes me to "Application history not found (app-20160324022723-0015)". Q2: Should the REST-API call have (immediately) shut down my spark-submit job? Should I file a JIRA for this problem? Background: Even thought Ctrl-C on spark-submit seems to work fine, this is no options for my Spark automation. Besides general design concerns my monitoring JVM may have been restarted after spark-submit has been started and therefore I cannot rely on the monitoring application to have access to the spark-submit OS process. Any thoughts are much appreciated, Christian
Re: Spark Streaming - stream between 2 applications
Instead of sending the results of the one spark app directly to the other one, you could write the results to a Kafka topic which is consumed by your other spark application. On Fri, Nov 20, 2015 at 12:07 PM Saiph Kappa <saiph.ka...@gmail.com> wrote: > I think my problem persists whether I use Kafka or sockets. Or am I wrong? > How would you use Kafka here? > > On Fri, Nov 20, 2015 at 7:12 PM, Christian <engr...@gmail.com> wrote: > >> Have you considered using Kafka? >> >> On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa <saiph.ka...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I have a basic spark streaming application like this: >>> >>> « >>> ... >>> >>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) >>> val rawStreams = (1 to numStreams).map(_ => >>> ssc.rawSocketStream[String](host, port, >>> StorageLevel.MEMORY_ONLY_SER)).toArray >>> val union = ssc.union(rawStreams) >>> >>> union.flatMap(line => line.split(' ')).foreachRDD(rdd => { >>> >>> // TODO >>> >>> } >>> ... >>> » >>> >>> >>> My question is: what is the best and fastest way to send the resulting rdds >>> as input to be consumed by another spark streaming application? >>> >>> I tried to add this code in place of the "TODO" comment: >>> >>> « >>> val serverSocket = new ServerSocket(9998) >>> while (true) { >>> val socket = serverSocket.accept() >>> @transient val out = new PrintWriter(socket.getOutputStream) >>> try { >>> rdd.foreach(out.write) >>> } catch { >>> case e: IOException => >>> socket.close() >>> } >>> } >>> » >>> >>> >>> I also tried to create a thread in the driver application code to launch the >>> socket server and then share state (the PrintWriter object) between the >>> driver program and tasks. >>> But got an exception saying that task is not serializable - PrintWriter is >>> not serializable >>> (despite the @trasient annotation). I know this is not a very elegant >>> solution, but what other >>> directions should I explore? >>> >>> Thanks. >>> >>> >
Spark EC2 script on Large clusters
For starters, thanks for the awesome product! When creating ec2-clusters of 20-40 nodes, things work great. When we create a cluster with the provided spark-ec2 script, it takes hours. When creating a 200 node cluster, it takes 2 1/2 hours and for a 500 node cluster it takes over 5 hours. One other problem we are having is that some nodes don't come up when the other ones do, the process seems to just move on, skipping the rsync and any installs on those ones. My guess as to why it takes so long to set up a large cluster is because of the use of rsync. What if instead of using rsync, you synched to s3 and then did a pdsh to pull it down on all of the machines. This is a big deal for us and if we can come up with a good plan, we might be able help out with the required changes. Are there any suggestions on how to deal with some of the nodes not being ready when the process starts? Thanks for your time, Christian
Re: Spark EC2 script on Large clusters
Let me rephrase. Emr cost is about twice as much as the spot price, making it almost 2/3 of the overall cost. On Thu, Nov 5, 2015 at 11:50 AM Christian <engr...@gmail.com> wrote: > Hi Johnathan, > > We are using EMR now and it's costing way too much. We do spot pricing and > the emr addon cost is about 2/3 the price of the actual spot instance. > On Thu, Nov 5, 2015 at 11:31 AM Jonathan Kelly <jonathaka...@gmail.com> > wrote: > >> Christian, >> >> Is there anything preventing you from using EMR, which will manage your >> cluster for you? Creating large clusters would take mins on EMR instead of >> hours. Also, EMR supports growing your cluster easily and recently added >> support for shrinking your cluster gracefully (even while jobs are running). >> >> ~ Jonathan >> >> On Thu, Nov 5, 2015 at 9:48 AM, Nicholas Chammas < >> nicholas.cham...@gmail.com> wrote: >> >>> Yeah, as Shivaram mentioned, this issue is well-known. It's documented >>> in SPARK-5189 <https://issues.apache.org/jira/browse/SPARK-5189> and a >>> bunch of related issues. Unfortunately, it's hard to resolve this issue in >>> spark-ec2 without rewriting large parts of the project. But if you take a >>> crack at it and succeed I'm sure a lot of people will be happy. >>> >>> I've started a separate project <https://github.com/nchammas/flintrock> -- >>> which Shivaram also mentioned -- which aims to solve the problem of >>> long launch times and other issues >>> <https://github.com/nchammas/flintrock#motivation> with spark-ec2. It's >>> still very young and lacks several critical features, but we are making >>> steady progress. >>> >>> Nick >>> >>> On Thu, Nov 5, 2015 at 12:30 PM Shivaram Venkataraman < >>> shiva...@eecs.berkeley.edu> wrote: >>> >>>> It is a known limitation that spark-ec2 is very slow for large >>>> clusters and as you mention most of this is due to the use of rsync to >>>> transfer things from the master to all the slaves. >>>> >>>> Nick cc'd has been working on an alternative approach at >>>> https://github.com/nchammas/flintrock that is more scalable. >>>> >>>> Thanks >>>> Shivaram >>>> >>>> On Thu, Nov 5, 2015 at 8:12 AM, Christian <engr...@gmail.com> wrote: >>>> > For starters, thanks for the awesome product! >>>> > >>>> > When creating ec2-clusters of 20-40 nodes, things work great. When we >>>> create >>>> > a cluster with the provided spark-ec2 script, it takes hours. When >>>> creating >>>> > a 200 node cluster, it takes 2 1/2 hours and for a 500 node cluster >>>> it takes >>>> > over 5 hours. One other problem we are having is that some nodes >>>> don't come >>>> > up when the other ones do, the process seems to just move on, >>>> skipping the >>>> > rsync and any installs on those ones. >>>> > >>>> > My guess as to why it takes so long to set up a large cluster is >>>> because of >>>> > the use of rsync. What if instead of using rsync, you synched to s3 >>>> and then >>>> > did a pdsh to pull it down on all of the machines. This is a big deal >>>> for us >>>> > and if we can come up with a good plan, we might be able help out >>>> with the >>>> > required changes. >>>> > >>>> > Are there any suggestions on how to deal with some of the nodes not >>>> being >>>> > ready when the process starts? >>>> > >>>> > Thanks for your time, >>>> > Christian >>>> > >>>> >>> >>
Re: Spark EC2 script on Large clusters
Hi Johnathan, We are using EMR now and it's costing way too much. We do spot pricing and the emr addon cost is about 2/3 the price of the actual spot instance. On Thu, Nov 5, 2015 at 11:31 AM Jonathan Kelly <jonathaka...@gmail.com> wrote: > Christian, > > Is there anything preventing you from using EMR, which will manage your > cluster for you? Creating large clusters would take mins on EMR instead of > hours. Also, EMR supports growing your cluster easily and recently added > support for shrinking your cluster gracefully (even while jobs are running). > > ~ Jonathan > > On Thu, Nov 5, 2015 at 9:48 AM, Nicholas Chammas < > nicholas.cham...@gmail.com> wrote: > >> Yeah, as Shivaram mentioned, this issue is well-known. It's documented in >> SPARK-5189 <https://issues.apache.org/jira/browse/SPARK-5189> and a >> bunch of related issues. Unfortunately, it's hard to resolve this issue in >> spark-ec2 without rewriting large parts of the project. But if you take a >> crack at it and succeed I'm sure a lot of people will be happy. >> >> I've started a separate project <https://github.com/nchammas/flintrock> -- >> which Shivaram also mentioned -- which aims to solve the problem of long >> launch times and other issues >> <https://github.com/nchammas/flintrock#motivation> with spark-ec2. It's >> still very young and lacks several critical features, but we are making >> steady progress. >> >> Nick >> >> On Thu, Nov 5, 2015 at 12:30 PM Shivaram Venkataraman < >> shiva...@eecs.berkeley.edu> wrote: >> >>> It is a known limitation that spark-ec2 is very slow for large >>> clusters and as you mention most of this is due to the use of rsync to >>> transfer things from the master to all the slaves. >>> >>> Nick cc'd has been working on an alternative approach at >>> https://github.com/nchammas/flintrock that is more scalable. >>> >>> Thanks >>> Shivaram >>> >>> On Thu, Nov 5, 2015 at 8:12 AM, Christian <engr...@gmail.com> wrote: >>> > For starters, thanks for the awesome product! >>> > >>> > When creating ec2-clusters of 20-40 nodes, things work great. When we >>> create >>> > a cluster with the provided spark-ec2 script, it takes hours. When >>> creating >>> > a 200 node cluster, it takes 2 1/2 hours and for a 500 node cluster it >>> takes >>> > over 5 hours. One other problem we are having is that some nodes don't >>> come >>> > up when the other ones do, the process seems to just move on, skipping >>> the >>> > rsync and any installs on those ones. >>> > >>> > My guess as to why it takes so long to set up a large cluster is >>> because of >>> > the use of rsync. What if instead of using rsync, you synched to s3 >>> and then >>> > did a pdsh to pull it down on all of the machines. This is a big deal >>> for us >>> > and if we can come up with a good plan, we might be able help out with >>> the >>> > required changes. >>> > >>> > Are there any suggestions on how to deal with some of the nodes not >>> being >>> > ready when the process starts? >>> > >>> > Thanks for your time, >>> > Christian >>> > >>> >> >
Re: Spark RDD cache persistence
I've never had this need and I've never done it. There are options that allow this. For example, I know there are web apps out there that work like the spark REPL. One of these I think is called Zepplin. . I've never used them, but I've seen them demoed. There is also Tachyon that Spark supports.. Hopefully, that gives you a place to start. On Thu, Nov 5, 2015 at 9:21 PM Deepak Sharma <deepakmc...@gmail.com> wrote: > Thanks Christian. > So is there any inbuilt mechanism in spark or api integration to other > inmemory cache products such as redis to load the RDD to these system upon > program exit ? > What's the best approach to have long lived RDD cache ? > Thanks > > > Deepak > On 6 Nov 2015 8:34 am, "Christian" <engr...@gmail.com> wrote: > >> The cache gets cleared out when the job finishes. I am not aware of a way >> to keep the cache around between jobs. You could save it as an object file >> to disk and load it as an object file on your next job for speed. >> On Thu, Nov 5, 2015 at 6:17 PM Deepak Sharma <deepakmc...@gmail.com> >> wrote: >> >>> Hi All >>> I am confused on RDD persistence in cache . >>> If I cache RDD , is it going to stay there in memory even if my spark >>> program completes execution , which created it. >>> If not , how can I guarantee that RDD is persisted in cache even after >>> the program finishes execution. >>> >>> Thanks >>> >>> >>> Deepak >>> >>
Re: Spark RDD cache persistence
The cache gets cleared out when the job finishes. I am not aware of a way to keep the cache around between jobs. You could save it as an object file to disk and load it as an object file on your next job for speed. On Thu, Nov 5, 2015 at 6:17 PM Deepak Sharmawrote: > Hi All > I am confused on RDD persistence in cache . > If I cache RDD , is it going to stay there in memory even if my spark > program completes execution , which created it. > If not , how can I guarantee that RDD is persisted in cache even after the > program finishes execution. > > Thanks > > > Deepak >
streaming and piping to R, sending all data in window to pipe()
Spark newbie here, using Spark 1.3.1. I’m consuming a stream and trying to pipe the data from the entire window to R for analysis. The R algorithm needs the entire dataset from the stream (everything in the window) in order to function properly; it can’t be broken up. So I tried doing a coalesce(1) before calling pipe(), but it still seems to be breaking up the data and invoking R, but it still seems to to be breaking up the data and invoking R multiple times with small pieces of data. Is there some other approach I should try? Here’s a small snippet: val inputs: DStream[String] = MQTTUtils.createStream(ssc, mqttBrokerUrl, inputsTopic, StorageLevel.MEMORY_AND_DISK_SER) .window(duration) inputs.foreachRDD { windowRdd = { if (windowRdd.count() 0) processWindow(windowRdd) } } ... def processWindow(windowRdd: RDD[String]) = { // call R script to process data windowRdd.coalesce(1) val outputsRdd: RDD[String] = windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString)) outputsRdd.cache() if (outputsRdd.count() 0) processOutputs(outputsRdd) } ... This e-mail message may contain privileged and/or confidential information, and is intended to be received only by persons entitled to receive such information. If you have received this e-mail in error, please notify the sender immediately. Please delete it and all attachments from any servers, hard drives or any other media. Other use of this e-mail by you is strictly prohibited. All e-mails and attachments sent and received are subject to monitoring, reading and archival by Monsanto, including its subsidiaries. The recipient of this e-mail is solely responsible for checking for the presence of Viruses or other Malware. Monsanto, along with its subsidiaries, accepts no liability for any damage caused by any such code transmitted by or accompanying this e-mail or any attachment. The information contained in this email may be subject to the export control laws and regulations of the United States, potentially including but not limited to the Export Administration Regulations (EAR) and sanctions regulations issued by the U.S. Department of Treasury, Office of Foreign Asset Controls (OFAC). As a recipient of this information you are obligated to comply with all applicable U.S. export laws and regulations.
Re: Super slow caching in 1.3?
Michael, There is only one schema: both versions have 200 string columns in one file. On Mon, Apr 20, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote: Now this is very important: “Normal RDDs” refers to “batch RDDs”. However the default in-memory Serialization of RDDs which are part of DSTream is “Srialized” rather than actual (hydrated) Objects. The Spark documentation states that “Serialization” is required for space and garbage collection efficiency (but creates higher CPU load) – which makes sense consider the large number of RDDs which get discarded in a streaming app So what does Data Bricks actually recommend as Object Oriented model for RDD elements used in Spark Streaming apps – flat or not and can you provide a detailed description / spec of both From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Thursday, April 16, 2015 7:23 PM To: Evo Eftimov Cc: Christian Perez; user Subject: Re: Super slow caching in 1.3? Here are the types that we specialize, other types will be much slower. This is only for Spark SQL, normal RDDs do not serialize data that is cached. I'll also not that until yesterday we were missing FloatType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154 Christian, can you provide the schema of the fast and slow datasets? On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote: Michael what exactly do you mean by flattened version/structure here e.g.: 1. An Object with only primitive data types as attributes 2. An Object with no more than one level of other Objects as attributes 3. An Array/List of primitive types 4. An Array/List of Objects This question is in general about RDDs not necessarily RDDs in the context of SparkSQL When answering can you also score how bad the performance of each of the above options is -Original Message- From: Christian Perez [mailto:christ...@svds.com] Sent: Thursday, April 16, 2015 6:09 PM To: Michael Armbrust Cc: user Subject: Re: Super slow caching in 1.3? Hi Michael, Good question! We checked 1.2 and found that it is also slow cacheing the same flat parquet file. Caching other file formats of the same data were faster by up to a factor of ~2. Note that the parquet file was created in Impala but the other formats were written by Spark SQL. Cheers, Christian On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote: Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pyspark where do third parties libraries need to be installed under Yarn-client mode
To run MLlib, you only need numpy on each node. For additional dependencies, you can call the spark-submit with --py-files option and add the .zip or .egg. https://spark.apache.org/docs/latest/submitting-applications.html Cheers, Christian On Fri, Apr 24, 2015 at 1:56 AM, Hoai-Thu Vuong thuv...@gmail.com wrote: I use sudo pip install ... for each machine in cluster. And don't think how submit library On Fri, Apr 24, 2015 at 4:21 AM dusts66 dustin.davids...@gmail.com wrote: I am trying to figure out python library management. So my question is: Where do third party Python libraries(ex. numpy, scipy, etc.) need to exist if I running a spark job via 'spark-submit' against my cluster in 'yarn client' mode. Do the libraries need to only exist on the client(ie. the server executing the driver code) or do the libraries need to exist on the datanode/worker nodes where the tasks are executed? The documentation seems to indicate that under 'yarn client' the libraries are only need on the client machine not the entire cluster. If the libraries are needed across all cluster machines, any suggestions on a deployment strategy or dependency management model that works well? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-where-do-third-parties-libraries-need-to-be-installed-under-Yarn-client-mode-tp22639.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib - Collaborative Filtering - trainImplicit task size
All these warnings come from ALS iterations, from flatMap and also from aggregate, for instance the origin of the state where the flatMap is showing these warnings (w/ Spark 1.3.0, they are also shown in Spark 1.3.1): org.apache.spark.rdd.RDD.flatMap(RDD.scala:296) org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527) scala.collection.immutable.Range.foreach(Range.scala:141) org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527) org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203) And from the aggregate: org.apache.spark.rdd.RDD.aggregate(RDD.scala:968) org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112) org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527) scala.collection.immutable.Range.foreach(Range.scala:141) org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527) org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203) On Thu, Apr 23, 2015 at 2:49 AM, Xiangrui Meng men...@gmail.com wrote: This is the size of the serialized task closure. Is stage 246 part of ALS iterations, or something before or after it? -Xiangrui On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone christian.per...@gmail.com wrote: Hi Sean, thanks for the answer. I tried to call repartition() on the input with many different sizes and it still continues to show that warning message. On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote: I think maybe you need more partitions in your input, which might make for smaller tasks? On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone christian.per...@gmail.com wrote: I keep seeing these warnings when using trainImplicit: WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. And then the task size starts to increase. Is this a known issue ? Thanks ! -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. -- Blog http://blog.christianperone.com | Github https://github.com/perone | Twitter https://twitter.com/tarantulae Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me.
MLlib - Collaborative Filtering - trainImplicit task size
I keep seeing these warnings when using trainImplicit: WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. And then the task size starts to increase. Is this a known issue ? Thanks ! -- Blog http://blog.christianperone.com | Github https://github.com/perone | Twitter https://twitter.com/tarantulae Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me.
Re: MLlib -Collaborative Filtering
The easiest way to do that is to use a similarity metric between the different user factors. On Sat, Apr 18, 2015 at 7:49 AM, riginos samarasrigi...@gmail.com wrote: Is there any way that i can see the similarity table of 2 users in that algorithm? by that i mean the similarity between 2 users -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Collaborative-Filtering-tp22553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Blog http://blog.christianperone.com | Github https://github.com/perone | Twitter https://twitter.com/tarantulae Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me.
Re: Super slow caching in 1.3?
Hi Michael, Good question! We checked 1.2 and found that it is also slow cacheing the same flat parquet file. Caching other file formats of the same data were faster by up to a factor of ~2. Note that the parquet file was created in Impala but the other formats were written by Spark SQL. Cheers, Christian On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote: Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Super slow caching in 1.3?
Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: persist(MEMORY_ONLY) takes lot of time
+1. Caching is way too slow. On Wed, Apr 1, 2015 at 12:33 PM, SamyaMaiti samya.maiti2...@gmail.com wrote: Hi Experts, I have a parquet dataset of 550 MB ( 9 Blocks) in HDFS. I want to run SQL queries repetitively. Few questions : 1. When I do the below (persist to memory after reading from disk), it takes lot of time to persist to memory, any suggestions of how to tune this? val inputP = sqlContext.parquetFile(some HDFS path) inputP.registerTempTable(sample_table) inputP.persist(MEMORY_ONLY) val result = sqlContext.sql(some sql query) result.count Note : Once the data is persisted to memory, it takes fraction of seconds to return query result from the second query onwards. So my concern is how to reduce the time when the data is first loaded to cache. 2. I have observed that if I omit the below line, inputP.persist(MEMORY_ONLY) the first time Query execution is comparatively quick (say it take 1min), as the load to Memory time is saved, but to my surprise the second time I run the same query it takes 30 sec as the inputP is not constructed from disk (checked from UI). So my question is, Does spark use some kind of internal caching for inputP in this scenario? Thanks in advance Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/persist-MEMORY-ONLY-takes-lot-of-time-tp22343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: input size too large | Performance issues with Spark
To Akhil's point, see Tuning Data structures. Avoid standard collection hashmap. With fewer machines, try running 4 or 5 cores per executor and only 3-4 executors (1 per node): http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/. Ought to reduce shuffle performance hit (someone else confirm?) #7 see default.shuffle.partitions (default: 200) On Sun, Mar 29, 2015 at 7:57 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Go through this once, if you haven't read it already. https://spark.apache.org/docs/latest/tuning.html Thanks Best Regards On Sat, Mar 28, 2015 at 7:33 PM, nsareen nsar...@gmail.com wrote: Hi All, I'm facing performance issues with spark implementation, and was briefly investigating on WebUI logs, i noticed that my RDD size is 55GB the Shuffle Write is 10 GB Input Size is 200GB. Application is a web application which does predictive analytics, so we keep most of our data in memory. This observation was only for 30mins usage of the application on a single user. We anticipate atleast 10-15 users of the application sending requests in parallel, which makes me a bit nervous. One constraint we have is that we do not have too many nodes in a cluster, we may end up with 3-4 machines at best, but they can be scaled up vertically each having 24 cores / 512 GB ram etc. which can allow us to make a virtual 10-15 node cluster. Even then the input size shuffle write is too high for my liking. Any suggestions in this regard will be greatly appreciated as there aren't much resource on the net for handling performance issues such as these. Some pointers on my application's data structures design 1) RDD is a JavaPairRDD with Key / Value as CustomPOJO containing 3-4 Hashmaps Value containing 1 Hashmap 2) Data is loaded via JDBCRDD during application startup, which also tends to take a lot of time, since we massage the data once it is fetched from DB and then save it as JavaPairRDD. 3) Most of the data is structured, but we are still using JavaPairRDD, have not explored the option of Spark SQL though. 4) We have only one SparkContext which caters to all the requests coming into the application from various users. 5) During a single user session user can send 3-4 parallel stages consisting of Map / Group By / Join / Reduce etc. 6) We have to change the RDD structure using different types of group by operations since the user can do drill down drill up of the data ( aggregation at a higher / lower level). This is where we make use of Groupby's but there is a cost associated with this. 7) We have observed, that the initial RDD's we create have 40 odd partitions, but post some stage executions like groupby's the partitions increase to 200 or so, this was odd, and we havn't figured out why this happens. In summary we wan to use Spark to provide us the capability to process our in-memory data structure very fast as well as scale to a larger volume when required in the future. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/input-size-too-large-Performance-issues-with-Spark-tp22270.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTable broken in v1.3 DataFrames?
Any other users interested in a feature DataFrame.saveAsExternalTable() for making _useful_ external tables in Hive, or am I the only one? Bueller? If I start a PR for this, will it be taken seriously? On Thu, Mar 19, 2015 at 9:34 AM, Christian Perez christ...@svds.com wrote: Hi Yin, Thanks for the clarification. My first reaction is that if this is the intended behavior, it is a wasted opportunity. Why create a managed table in Hive that cannot be read from inside Hive? I think I understand now that you are essentially piggybacking on Hive's metastore to persist table info between/across sessions, but I imagine others might expect more (as I have.) We find ourselves wanting to do work in Spark and persist the results where other users (e.g. analysts using Tableau connected to Hive/Impala) can explore it. I imagine this is very common. I can, of course, save it as parquet and create an external table in hive (which I will do now), but saveAsTable seems much less useful to me now. Any other opinions? Cheers, C On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote: I meant table properties and serde properties are used to store metadata of a Spark SQL data source table. We do not set other fields like SerDe lib. For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table should not show unrelated stuff like Serde lib and InputFormat. I have created https://issues.apache.org/jira/browse/SPARK-6413 to track the improvement on the output of DESCRIBE statement. On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote: Hi Christian, Your table is stored correctly in Parquet format. For saveAsTable, the table created is not a Hive table, but a Spark SQL data source table (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources). We are only using Hive's metastore to store the metadata (to be specific, only table properties and serde properties). When you look at table property, there will be a field called spark.sql.sources.provider and the value will be org.apache.spark.sql.parquet.DefaultSource. You can also look at your files in the file system. They are stored by Parquet. Thanks, Yin On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com wrote: Hi all, DataFrame.saveAsTable creates a managed table in Hive (v0.13 on CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong* schema _and_ storage format in the Hive metastore, so that the table cannot be read from inside Hive. Spark itself can read the table, but Hive throws a Serialization error because it doesn't know it is Parquet. val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income) df.saveAsTable(spark_test_foo) Expected: COLUMNS( education BIGINT, income BIGINT ) SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Actual: COLUMNS( col arraystring COMMENT from deserializer ) SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat --- Manually changing schema and storage restores access in Hive and doesn't affect Spark. Note also that Hive's table property spark.sql.sources.schema is correct. At first glance, it looks like the schema data is serialized when sent to Hive but not deserialized properly on receive. I'm tracing execution through source code... but before I get any deeper, can anyone reproduce this behavior? Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
saveAsTable broken in v1.3 DataFrames?
Hi all, DataFrame.saveAsTable creates a managed table in Hive (v0.13 on CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong* schema _and_ storage format in the Hive metastore, so that the table cannot be read from inside Hive. Spark itself can read the table, but Hive throws a Serialization error because it doesn't know it is Parquet. val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income) df.saveAsTable(spark_test_foo) Expected: COLUMNS( education BIGINT, income BIGINT ) SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Actual: COLUMNS( col arraystring COMMENT from deserializer ) SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat --- Manually changing schema and storage restores access in Hive and doesn't affect Spark. Note also that Hive's table property spark.sql.sources.schema is correct. At first glance, it looks like the schema data is serialized when sent to Hive but not deserialized properly on receive. I'm tracing execution through source code... but before I get any deeper, can anyone reproduce this behavior? Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTable broken in v1.3 DataFrames?
Hi Yin, Thanks for the clarification. My first reaction is that if this is the intended behavior, it is a wasted opportunity. Why create a managed table in Hive that cannot be read from inside Hive? I think I understand now that you are essentially piggybacking on Hive's metastore to persist table info between/across sessions, but I imagine others might expect more (as I have.) We find ourselves wanting to do work in Spark and persist the results where other users (e.g. analysts using Tableau connected to Hive/Impala) can explore it. I imagine this is very common. I can, of course, save it as parquet and create an external table in hive (which I will do now), but saveAsTable seems much less useful to me now. Any other opinions? Cheers, C On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote: I meant table properties and serde properties are used to store metadata of a Spark SQL data source table. We do not set other fields like SerDe lib. For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table should not show unrelated stuff like Serde lib and InputFormat. I have created https://issues.apache.org/jira/browse/SPARK-6413 to track the improvement on the output of DESCRIBE statement. On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote: Hi Christian, Your table is stored correctly in Parquet format. For saveAsTable, the table created is not a Hive table, but a Spark SQL data source table (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources). We are only using Hive's metastore to store the metadata (to be specific, only table properties and serde properties). When you look at table property, there will be a field called spark.sql.sources.provider and the value will be org.apache.spark.sql.parquet.DefaultSource. You can also look at your files in the file system. They are stored by Parquet. Thanks, Yin On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com wrote: Hi all, DataFrame.saveAsTable creates a managed table in Hive (v0.13 on CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong* schema _and_ storage format in the Hive metastore, so that the table cannot be read from inside Hive. Spark itself can read the table, but Hive throws a Serialization error because it doesn't know it is Parquet. val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income) df.saveAsTable(spark_test_foo) Expected: COLUMNS( education BIGINT, income BIGINT ) SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Actual: COLUMNS( col arraystring COMMENT from deserializer ) SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat --- Manually changing schema and storage restores access in Hive and doesn't affect Spark. Note also that Hive's table property spark.sql.sources.schema is correct. At first glance, it looks like the schema data is serialized when sent to Hive but not deserialized properly on receive. I'm tracing execution through source code... but before I get any deeper, can anyone reproduce this behavior? Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SV: Pyspark Hbase scan.
?Sorry forgot to attach traceback. Regards Rene Castberg Fra: Castberg, René Christian Sendt: 13. mars 2015 07:13 Til: user@spark.apache.org Kopi: gen tang Emne: SV: Pyspark Hbase scan. ?Hi, I have now successfully managed to test this in a local spark session. But i am having a huge programming getting this to work with Horton Works technical preview. I think that there is an incompatability with the way YARN has been compiled. After changing the hbase version, and adding: resolvers += Hortonworks Releases at http://repo.hortonworks.com/content/repositories/releases/; I get the attached traceback. Any help in how to compile this jar such that it works would be greatly appreciated. Regards Rene Castberg Fra: gen tang gen.tan...@gmail.com Sendt: 5. februar 2015 11:38 Til: Castberg, René Christian Kopi: user@spark.apache.org Emne: Re: Pyspark Hbase scan. Hi, In fact, this pull https://github.com/apache/spark/pull/3920 is to do Hbase scan. However, it is not merged yet. You can also take a look at the example code at http://spark-packages.org/package/20 which is using scala and python to read data from hbase. Hope this can be helpful. Cheers Gen On Thu, Feb 5, 2015 at 11:11 AM, Castberg, René Christian rene.castb...@dnvgl.commailto:rene.castb...@dnvgl.com wrote: ?Hi, I am trying to do a hbase scan and read it into a spark rdd using pyspark. I have successfully written data to hbase from pyspark, and been able to read a full table from hbase using the python example code. Unfortunately I am unable to find any example code for doing an HBase scan and read it into a spark rdd from pyspark. I have found a scala example : http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark But i can't find anything on how to do this from python. Can anybody shed some light on how (and if) this can be done?? Regards Rene Castberg? ** This e-mail and any attachments thereto may contain confidential information and/or information protected by intellectual property rights for the exclusive attention of the intended addressees named above. If you have received this transmission in error, please immediately notify the sender by return e-mail and delete this message and its attachments. Unauthorized use, copying or further full or partial distribution of this e-mail or its contents is prohibited. ** ** This e-mail and any attachments thereto may contain confidential information and/or information protected by intellectual property rights for the exclusive attention of the intended addressees named above. If you have received this transmission in error, please immediately notify the sender by return e-mail and delete this message and its attachments. Unauthorized use, copying or further full or partial distribution of this e-mail or its contents is prohibited. ** $ /hadoop-dist/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --driver-class-path /usr/hdp/current/share/lzo/0.6.0/lib/hadoop-lzo-0.6.0.jar:/home/recast/spark_hbase/target/scala-2.10/spark_hbase-assembly-1.0.jar --jars /hadoop-dist/spark-1.2.1-bin-hadoop2.4/lib/spark-examples-1.2.1-hadoop2.4.0.jar --driver-library-path /usr/hdp/current/share/lzo/0.6.0/lib/native/Linux-amd64-64/ AIS_count_msb_hbase.py Spark assembly has been built with Hive, including Datanucleus jars on classpath 2.7.9 (default, Feb 25 2015, 14:55:10) [GCC 4.4.7 20120313 (Red Hat 4.4.7-11)] /hadoop-dist/Python/lib/python2.7/site-packages/setuptools-12.3-py2.7.egg/pkg_resources/__init__.py:1224: UserWarning: /tmp/python-eggs is writable by group/others and vulnerable to attack when used with get_resource_filename. Consider a more secure location (set with .set_extraction_path or the PYTHON_EGG_CACHE environment variable). Reading config file for : smalldata01.hdp SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/recast/spark_hbase/target/scala-2.10/spark_hbase-assembly-1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/hadoop-dist/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/03/13 06:10:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/13 06:10:34 WARN YarnClientSchedulerBackend: NOTE: SPARK_WORKER_INSTANCES is deprecated. Use
SV: Pyspark Hbase scan.
?Hi, I have now successfully managed to test this in a local spark session. But i am having a huge programming getting this to work with Horton Works technical preview. I think that there is an incompatability with the way YARN has been compiled. After changing the hbase version, and adding: resolvers += Hortonworks Releases at http://repo.hortonworks.com/content/repositories/releases/; I get the attached traceback. Any help in how to compile this jar such that it works would be greatly appreciated. Regards Rene Castberg Fra: gen tang gen.tan...@gmail.com Sendt: 5. februar 2015 11:38 Til: Castberg, René Christian Kopi: user@spark.apache.org Emne: Re: Pyspark Hbase scan. Hi, In fact, this pull https://github.com/apache/spark/pull/3920 is to do Hbase scan. However, it is not merged yet. You can also take a look at the example code at http://spark-packages.org/package/20 which is using scala and python to read data from hbase. Hope this can be helpful. Cheers Gen On Thu, Feb 5, 2015 at 11:11 AM, Castberg, René Christian rene.castb...@dnvgl.commailto:rene.castb...@dnvgl.com wrote: ?Hi, I am trying to do a hbase scan and read it into a spark rdd using pyspark. I have successfully written data to hbase from pyspark, and been able to read a full table from hbase using the python example code. Unfortunately I am unable to find any example code for doing an HBase scan and read it into a spark rdd from pyspark. I have found a scala example : http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark But i can't find anything on how to do this from python. Can anybody shed some light on how (and if) this can be done?? Regards Rene Castberg? ** This e-mail and any attachments thereto may contain confidential information and/or information protected by intellectual property rights for the exclusive attention of the intended addressees named above. If you have received this transmission in error, please immediately notify the sender by return e-mail and delete this message and its attachments. Unauthorized use, copying or further full or partial distribution of this e-mail or its contents is prohibited. ** ** This e-mail and any attachments thereto may contain confidential information and/or information protected by intellectual property rights for the exclusive attention of the intended addressees named above. If you have received this transmission in error, please immediately notify the sender by return e-mail and delete this message and its attachments. Unauthorized use, copying or further full or partial distribution of this e-mail or its contents is prohibited. **
New guide on how to write a Spark job in Clojure
Hi all, Maybe some of you are interested: I wrote a new guide on how to start using Spark from Clojure. The tutorial covers * setting up a project, * doing REPL- or Test Driven Development of Spark jobs * Running Spark jobs locally. Just read it on https://gorillalabs.github.io/sparkling/articles/tfidf_guide.html. Comments (and Pull requests) are very welcome. Sincerly Chris
Re: Datastore HDFS vs Cassandra
Hi Regarding the Cassandra Data model, there's an excellent post on the ebay tech blog: http://www.ebaytechblog.com/2012/07/16/cassandra-data-modeling-best-practices-part-1/. There's also a slideshare for this somewhere. Happy hacking Chris Von: Franc Carter franc.car...@rozettatech.commailto:franc.car...@rozettatech.com Datum: Mittwoch, 11. Februar 2015 10:03 An: Paolo Platter paolo.plat...@agilelab.itmailto:paolo.plat...@agilelab.it Cc: Mike Trienis mike.trie...@orcsol.commailto:mike.trie...@orcsol.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Betreff: Re: Datastore HDFS vs Cassandra One additional comment I would make is that you should be careful with Updates in Cassandra, it does support them but large amounts of Updates (i.e changing existing keys) tends to cause fragmentation. If you are (mostly) adding new keys (e.g new records in the the time series) then Cassandra can be excellent cheers On Wed, Feb 11, 2015 at 6:13 PM, Paolo Platter paolo.plat...@agilelab.itmailto:paolo.plat...@agilelab.it wrote: Hi Mike, I developed a Solution with cassandra and spark, using DSE. The main difficult is about cassandra, you need to understand very well its data model and its Query patterns. Cassandra has better performance than hdfs and it has DR and stronger availability. Hdfs is a filesystem, cassandra is a dbms. Cassandra supports full CRUD without acid. Hdfs is more flexible than cassandra. In my opinion, if you have a real time series, go with Cassandra paying attention at your reporting data access patterns. Paolo Inviata dal mio Windows Phone Da: Mike Trienismailto:mike.trie...@orcsol.com Inviato: ?11/?02/?2015 05:59 A: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: Datastore HDFS vs Cassandra Hi, I am considering implement Apache Spark on top of Cassandra database after listing to related talk and reading through the slides from DataStax. It seems to fit well with our time-series data and reporting requirements. http://www.slideshare.net/patrickmcfadin/apache-cassandra-apache-spark-for-time-series-data Does anyone have any experiences using Apache Spark and Cassandra, including limitations (and or) technical difficulties? How does Cassandra compare with HDFS and what use cases would make HDFS more suitable? Thanks, Mike. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Datastore-HDFS-vs-Cassandra-tp21590.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org -- Franc Carter | Systems Architect | Rozetta Technology franc.car...@rozettatech.com mailto:franc.car...@rozettatech.com | www.rozettatechnology.comhttp://www.rozettatechnology.com/ Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Pyspark Hbase scan.
?Hi, I am trying to do a hbase scan and read it into a spark rdd using pyspark. I have successfully written data to hbase from pyspark, and been able to read a full table from hbase using the python example code. Unfortunately I am unable to find any example code for doing an HBase scan and read it into a spark rdd from pyspark. I have found a scala example : http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark But i can't find anything on how to do this from python. Can anybody shed some light on how (and if) this can be done?? Regards Rene Castberg? ** This e-mail and any attachments thereto may contain confidential information and/or information protected by intellectual property rights for the exclusive attention of the intended addressees named above. If you have received this transmission in error, please immediately notify the sender by return e-mail and delete this message and its attachments. Unauthorized use, copying or further full or partial distribution of this e-mail or its contents is prohibited. **
Re: Spark 1.1 / cdh4 stuck using old hadoop client?
Is 1.0.8 working for you ? You indicated your last known good version is 1.0.0 Maybe we can track down where it broke. On Sep 16, 2014, at 12:25 AM, Paul Wais pw...@yelp.com wrote: Thanks Christian! I tried compiling from source but am still getting the same hadoop client version error when reading from HDFS. Will have to poke deeper... perhaps I've got some classpath issues. FWIW I compiled using: $ MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m mvn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package and hadoop 2.3 / cdh5 from http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz On Mon, Sep 15, 2014 at 6:49 PM, Christian Chua cc8...@icloud.com wrote: Hi Paul. I would recommend building your own 1.1.0 distribution. ./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests I downloaded the Pre-build for Hadoop 2.4 binary, and it had this strange behavior where spark-submit --master yarn-cluster ... will work, but spark-submit --master yarn-client ... will fail. But on the personal build obtained from the command above, both will then work. -Christian On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote: Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1 / cdh4 stuck using old hadoop client?
Hi Paul. I would recommend building your own 1.1.0 distribution. ./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests I downloaded the Pre-build for Hadoop 2.4 binary, and it had this strange behavior where spark-submit --master yarn-cluster ... will work, but spark-submit --master yarn-client ... will fail. But on the personal build obtained from the command above, both will then work. -Christian On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote: Dear List, I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for reading SequenceFiles. In particular, I'm seeing: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source) ... When invoking JavaSparkContext#newAPIHadoopFile(). (With args validSequenceFileURI, SequenceFileInputFormat.class, Text.class, BytesWritable.class, new Job().getConfiguration() -- Pretty close to the unit test here: https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916 ) This error indicates to me that Spark is using an old hadoop client to do reads. Oddly I'm able to do /writes/ ok, i.e. I'm able to write via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster. Do I need to explicitly build spark for modern hadoop?? I previously had an hdfs cluster running hadoop 2.3.0 and I was getting a similar error (server is using version 9, client is using version 4). I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted on spark's site: * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz What distro of hadoop is used at Data Bricks? Are there distros of Spark 1.1 and hadoop that should work together out-of-the-box? (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..) Thanks for any help anybody can give me here! -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
K-NN by efficient sparse matrix product
Hi, I'm new to Spark and Hadoop, and I'd like to know if the following problem is solvable in terms of Spark's primitives. To compute the K-nearest neighbours of a N-dimensional dataset, I can multiply my very large normalized sparse matrix by its transpose. As this yields all pairwise distance values (N x N), I can then sort each row and only keep the K highest elements for each, resulting in a N x K dense matrix. As this Quora answer suggests: http://qr.ae/v03lY rather than the row-wise dot product, which would be O(N^2), it's better to compute the sum of the column outer products, which is O(N x K^2). However, given the number of non-zero elements in the resulting matrix, it seems I could not afford to first perform the full multiplication (N x N) and then prune it afterward (N x K).. So I need a way to prune it on the fly. The original algorithm I came up with is roughly this, for an input matrix M: for each row i: __outer_i = [0] * N __for j in nonzero elements of row i: for k in nonzero elements of col j: __outer_i[k] += M[i][j] * M[k][j] __nearest_i = {sort outer_i and keep best K} which can be parallelized in an embarrassing way, i.e. each compute node can simply process a slice of the the rows. Would there be a way to do something similar (or related) with Spark? Christian
Re: K-NN by efficient sparse matrix product
Thank you for your answer. Would you have by any chance some example code (even fragmentary) that I could study? On 28 May 2014 14:04, Tom Vacek minnesota...@gmail.com wrote: Maybe I should add: if you can hold the entire matrix in memory, then this is embarrassingly parallel. If not, then the complications arise. On Wed, May 28, 2014 at 1:00 PM, Tom Vacek minnesota...@gmail.com wrote: The problem with matrix multiplication is that the amount of data blows up between the mapper and the reducer, and the shuffle operation is very slow. I have not ever tried this, but the shuffle can be avoided by making use of the broadcast. Say we have M = L*R. We do a column decomposition on R, and we collect rows of L to the master and broadcast them (in manageably-sized blocks). Each worker does a dot product and discards the row block when finished. In theory, this has complexity max(nnz(L)*log p, nnz(L)*n/p). I have to warn though: when I played with matrix multiplication, I was getting nowhere near serial performance. On Wed, May 28, 2014 at 11:00 AM, Christian Jauvin cjau...@gmail.com wrote: Hi, I'm new to Spark and Hadoop, and I'd like to know if the following problem is solvable in terms of Spark's primitives. To compute the K-nearest neighbours of a N-dimensional dataset, I can multiply my very large normalized sparse matrix by its transpose. As this yields all pairwise distance values (N x N), I can then sort each row and only keep the K highest elements for each, resulting in a N x K dense matrix. As this Quora answer suggests: http://qr.ae/v03lY rather than the row-wise dot product, which would be O(N^2), it's better to compute the sum of the column outer products, which is O(N x K^2). However, given the number of non-zero elements in the resulting matrix, it seems I could not afford to first perform the full multiplication (N x N) and then prune it afterward (N x K).. So I need a way to prune it on the fly. The original algorithm I came up with is roughly this, for an input matrix M: for each row i: __outer_i = [0] * N __for j in nonzero elements of row i: for k in nonzero elements of col j: __outer_i[k] += M[i][j] * M[k][j] __nearest_i = {sort outer_i and keep best K} which can be parallelized in an embarrassing way, i.e. each compute node can simply process a slice of the the rows. Would there be a way to do something similar (or related) with Spark? Christian
Re: pyspark and Python virtual enviroments
Thanks Bryn. On Wed, Mar 5, 2014 at 9:00 PM, Bryn Keller xol...@xoltar.org wrote: Hi Christian, The PYSPARK_PYTHON environment variable specifies the python executable to use for pyspark. You can put the path to a virtualenv's python executable and it will work fine. Remember you have to have the same installation at the same path on each of your cluster nodes for pyspark to work. If you're creating the spark context yourself in a python application, you can use os.environ['PYSPARK_PYTHON'] = sys.executable before creating your spark context. Hope that helps, Bryn On Wed, Mar 5, 2014 at 4:54 AM, Christian chri...@gmail.com wrote: Hello, I usually create different python virtual environments for different projects to avoid version conflicts and skip the requirement to be root to install libs. How can I specify to pyspark to activate a virtual environment before executing the tasks ? Further info on virtual envs: http://virtualenv.readthedocs.org/en/latest/virtualenv.html Thanks in advance, Christian