Re: SPARK Issue in Standalone cluster
> The general idea of writing to the user group is that people who know should > answer, and not those who do not know. I’d also add that if you’re going to write to the user group, you should be polite to people who try to answer your queries, even if you think they’re wrong. This is especially true if the people you think are wrong are actually correct. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 > On Aug 2, 2017, at 6:25 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > > Hi, > > I am definitely sure that at this point of time everyone who has kindly cared > to respond to my query do need to go and check this link > https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode > > <https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode>. > > > It does mention that SPARK standalone cluster can have multiple machines > running as slaves. > > The general idea of writing to the user group is that people who know should > answer, and not those who do not know. > > > > Regards, > Gourav Sengupta > > On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker > <mahesh_sawai...@persistent.com <mailto:mahesh_sawai...@persistent.com>> > wrote: > Gourav, > > Riccardo’s answer is spot on. > > What is happening is one node of spark is writing to its own directory and > telling a slave to read the data from there, when the slave goes to read it, > the part is not found. > > > > Check the folder > Users/gouravsengupta/Development/spark/sparkdata/test1/part-1-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet > on the slave. > > The reason it ran on spark 1.5 may have been because the executor ran on the > driver itself. There is not much use to a set up where you don’t have some > kind of distributed file system, so I would encourage you to use hdfs, or a > mounted file system shared by all nodes. > > > > Regards, > > Mahesh > > > > > > From: Gourav Sengupta [mailto:gourav.sengu...@gmail.com > <mailto:gourav.sengu...@gmail.com>] > Sent: Monday, July 31, 2017 9:54 PM > To: Riccardo Ferrari > Cc: user > Subject: Re: SPARK Issue in Standalone cluster > > > > Hi Riccardo, > > > > I am grateful for your kind response. > > > > Also I am sure that your answer is completely wrong and errorneous. SPARK > must be having a method so that different executors do not pick up the same > files to process. You also did not answer the question why was the processing > successful in SPARK 1.5 and not in SPARK 2.2. > > > > Also the exact same directory is is present across in both the nodes. > > > > I feel quite facinated when individuals respond before even understanding the > issue, or trying out the code. > > > > It will be of great help if someone could kindly read my email and help me > figure out the issue. > > > > > > Regards, > > Gourav Sengupta > > > > > > > > On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari <ferra...@gmail.com > <mailto:ferra...@gmail.com>> wrote: > > Hi Gourav, > > > > The issue here is the location where you're trying to write/read from > :/Users/gouravsengupta/Development/spark/sparkdata/test1/p... > > When dealing with clusters all the paths and resources should be available to > all executors (and driver), and that is reason why you generally use HDFS, > S3, NFS or any shared file system. > > > > Spark assumes your data is generally available to all nodes and does not > tries to pick up the data from a selected node, it rather tries to write/read > in parallel from the executor nodes. Also given its control logic there is no > way (read. you should not care) to know what executor is doing what task. > > > > Hope it helps, > > Riccardo > > > > On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <gourav.sengu...@gmail.com > <mailto:gourav.sengu...@gmail.com>> wrote: > > Hi, > > > > I am working by creating a native SPARK standalone cluster > (https://spark.apache.org/docs/2.2.0/spark-standalone.html > <https://spark.apache.org/docs/2.2.0/spark-standalone.html>) > > > > Therefore I do not have a HDFS. > > > > > > EXERCISE: > > Its the most fundamental and simple exercise. Create a sample SPARK dataframe > and then write it to a location and then read it back. > > > > SETTINGS: > > So after I have installed SPARK in two phys
Re: real world spark code
There’s a number of real-world open source Spark applications in the sciences: genomics: github.com/bigdatagenomics/adam <http://github.com/bigdatagenomics/adam> <— core is scala, has py/r wrappers https://github.com/broadinstitute/gatk <https://github.com/broadinstitute/gatk> <— core is java https://github.com/hail-is/hail <https://github.com/hail-is/hail> <— core is scala, mostly used through python wrappers neuroscience: https://github.com/thunder-project/thunder#using-with-spark <https://github.com/thunder-project/thunder#using-with-spark> <— pyspark Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 > On Jul 25, 2017, at 8:09 AM, Jörn Franke <jornfra...@gmail.com> wrote: > > Continuous integration (Travis, jenkins) and reporting on unit tests, > integration tests etc for each source code version. > > On 25. Jul 2017, at 16:58, Adaryl Wakefield <adaryl.wakefi...@hotmail.com > <mailto:adaryl.wakefi...@hotmail.com>> wrote: > >> ci+reporting? I’ve never heard of that term before. What is that? >> >> Adaryl "Bob" Wakefield, MBA >> Principal >> Mass Street Analytics, LLC >> 913.938.6685 >> www.massstreet.net <http://www.massstreet.net/> >> www.linkedin.com/in/bobwakefieldmba >> <http://www.linkedin.com/in/bobwakefieldmba> >> Twitter: @BobLovesData <http://twitter.com/BobLovesData> >> >> >> From: Jörn Franke [mailto:jornfra...@gmail.com >> <mailto:jornfra...@gmail.com>] >> Sent: Tuesday, July 25, 2017 8:31 AM >> To: Adaryl Wakefield <adaryl.wakefi...@hotmail.com >> <mailto:adaryl.wakefi...@hotmail.com>> >> Cc: user@spark.apache.org <mailto:user@spark.apache.org> >> Subject: Re: real world spark code >> >> Look for the ones that have unit and integration tests as well as a >> ci+reporting on code quality. >> >> All the others are just toy examples. Well should be :) >> >> On 25. Jul 2017, at 01:08, Adaryl Wakefield <adaryl.wakefi...@hotmail.com >> <mailto:adaryl.wakefi...@hotmail.com>> wrote: >> >> Anybody know of publicly available GitHub repos of real world Spark >> applications written in scala? >> >> Adaryl "Bob" Wakefield, MBA >> Principal >> Mass Street Analytics, LLC >> 913.938.6685 >> www.massstreet.net <http://www.massstreet.net/> >> www.linkedin.com/in/bobwakefieldmba >> <http://www.linkedin.com/in/bobwakefieldmba> >> Twitter: @BobLovesData <http://twitter.com/BobLovesData>
Re: newbie HDFS S3 best practices
Hard to say with #1 without knowing your application’s characteristics; for #2, we use conductor <https://github.com/BD2KGenomics/conductor> with IAM roles, .boto/.aws/credentials files. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 > On Mar 15, 2016, at 11:45 AM, Andy Davidson <a...@santacruzintegration.com> > wrote: > > We use the spark-ec2 script to create AWS clusters as needed (we do not use > AWS EMR) > will we get better performance if we copy data to HDFS before we run instead > of reading directly from S3? > 2. What is a good way to move results from HDFS to S3? > > > It seems like there are many ways to bulk copy to s3. Many of them require we > explicitly use the AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@ > <mailto:AWS_SECRET_ACCESS_KEY@/yasemindeneme/deneme.txt>. This seems like a > bad idea? > > What would you recommend? > > Thanks > > Andy > >
Re: which database for gene alignment data ?
Hi Roni, These are exposed as public APIs. If you want, you can run them inside of the adam-shell (which is just a wrapper for the spark shell, but with the ADAM libraries on the class path). Also , I need to save all my intermediate data. Seems like ADAM stores data in Parquet on HDFS. I want to save something in an external database, so that we can re-use the saved data in multiple ways by multiple people. The Parquet data can be accessed via Hive, Spark SQL, Impala, etc. Additionally, from ADAM, you can export most data out to legacy genomics formats. I’m not sure though if we support that right now for feature data; those are fairly new. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jun 9, 2015, at 9:21 PM, roni roni.epi...@gmail.com wrote: Hi Frank, Thanks for the reply. I downloaded ADAM and built it but it does not seem to list this function for command line options. Are these exposed as public API and I can call it from code ? Also , I need to save all my intermediate data. Seems like ADAM stores data in Parquet on HDFS. I want to save something in an external database, so that we can re-use the saved data in multiple ways by multiple people. Any suggestions on the DB selection or keeping data centralized for use by multiple distinct groups? Thanks -Roni On Mon, Jun 8, 2015 at 12:47 PM, Frank Austin Nothaft fnoth...@berkeley.edu wrote: Hi Roni, We have a full suite of genomic feature parsers that can read BED, narrowPeak, GATK interval lists, and GTF/GFF into Spark RDDs in ADAM Additionally, we have support for efficient overlap joins (query 3 in your email below). You can load the genomic features with ADAMContext.loadFeatures. We have two tools for the overlap computation: you can use a BroadcastRegionJoin if one of the datasets you want to overlap is small or a ShuffleRegionJoin if both datasets are large. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jun 8, 2015, at 9:39 PM, roni roni.epi...@gmail.com wrote: Sorry for the delay. The files (called .bed files) have format like - Chromosome start endfeature score strand chr1 713776 714375 peak.1 599+ chr1 752401 753000 peak.2 599+ The mandatory fields are chrom - The name of the chromosome (e.g. chr3, chrY, chr2_random) or scaffold (e.g. scaffold10671). chromStart - The starting position of the feature in the chromosome or scaffold. The first base in a chromosome is numbered 0. chromEnd - The ending position of the feature in the chromosome or scaffold. The chromEnd base is not included in the display of the feature. For example, the first 100 bases of a chromosome are defined as chromStart=0, chromEnd=100, and span the bases numbered 0-99. There can be more data as described - https://genome.ucsc.edu/FAQ/FAQformat.html#format1 Many times the use cases are like 1. find the features between given start and end positions 2.Find features which have overlapping start and end points with another feature. 3. read external (reference) data which will have similar format (chr10 4851478549604641MAPK8 49514785+) and find all the data points which are overlapping with the other .bed files. The data is huge. .bed files can range from .5 GB to 5 gb (or more) I was thinking of using cassandra, but not sue if the overlapping queries can be supported and will be fast enough. Thanks for the help -Roni On Sat, Jun 6, 2015 at 7:03 AM, Ted Yu yuzhih...@gmail.com wrote: Can you describe your use case in a bit more detail since not all people on this mailing list are familiar with gene sequencing alignments data ? Thanks On Fri, Jun 5, 2015 at 11:42 PM, roni roni.epi...@gmail.com wrote: I want to use spark for reading compressed .bed file for reading gene sequencing alignments data. I want to store bed file data in db and then use external gene expression data to find overlaps etc, which database is best for it ? Thanks -Roni
Re: which database for gene alignment data ?
Hi Roni, We have a full suite of genomic feature parsers that can read BED, narrowPeak, GATK interval lists, and GTF/GFF into Spark RDDs in ADAM Additionally, we have support for efficient overlap joins (query 3 in your email below). You can load the genomic features with ADAMContext.loadFeatures. We have two tools for the overlap computation: you can use a BroadcastRegionJoin if one of the datasets you want to overlap is small or a ShuffleRegionJoin if both datasets are large. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jun 8, 2015, at 9:39 PM, roni roni.epi...@gmail.com wrote: Sorry for the delay. The files (called .bed files) have format like - Chromosome start endfeature score strand chr1 713776 714375 peak.1 599+ chr1 752401 753000 peak.2 599+ The mandatory fields are chrom - The name of the chromosome (e.g. chr3, chrY, chr2_random) or scaffold (e.g. scaffold10671). chromStart - The starting position of the feature in the chromosome or scaffold. The first base in a chromosome is numbered 0. chromEnd - The ending position of the feature in the chromosome or scaffold. The chromEnd base is not included in the display of the feature. For example, the first 100 bases of a chromosome are defined as chromStart=0, chromEnd=100, and span the bases numbered 0-99. There can be more data as described - https://genome.ucsc.edu/FAQ/FAQformat.html#format1 Many times the use cases are like 1. find the features between given start and end positions 2.Find features which have overlapping start and end points with another feature. 3. read external (reference) data which will have similar format (chr10 4851478549604641MAPK8 49514785+) and find all the data points which are overlapping with the other .bed files. The data is huge. .bed files can range from .5 GB to 5 gb (or more) I was thinking of using cassandra, but not sue if the overlapping queries can be supported and will be fast enough. Thanks for the help -Roni On Sat, Jun 6, 2015 at 7:03 AM, Ted Yu yuzhih...@gmail.com wrote: Can you describe your use case in a bit more detail since not all people on this mailing list are familiar with gene sequencing alignments data ? Thanks On Fri, Jun 5, 2015 at 11:42 PM, roni roni.epi...@gmail.com wrote: I want to use spark for reading compressed .bed file for reading gene sequencing alignments data. I want to store bed file data in db and then use external gene expression data to find overlaps etc, which database is best for it ? Thanks -Roni
Re: Spark Job Failed - Class not serializable
You’ll definitely want to use a Kryo-based serializer for Avro. We have a Kryo based serializer that wraps the Avro efficient serializer here. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Apr 3, 2015, at 5:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Because, its throwing up serializable exceptions and kryo is a serializer to serialize your objects. Thanks Best Regards On Fri, Apr 3, 2015 at 5:37 PM, Deepak Jain deepuj...@gmail.com wrote: I meant that I did not have to use kyro. Why will kyro help fix this issue now ? Sent from my iPhone On 03-Apr-2015, at 5:36 pm, Deepak Jain deepuj...@gmail.com wrote: I was able to write record that extends specificrecord (avro) this class was not auto generated. Do we need to do something extra for auto generated classes Sent from my iPhone On 03-Apr-2015, at 5:06 pm, Akhil Das ak...@sigmoidanalytics.com wrote: This thread might give you some insights http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201311.mbox/%3CCA+WVT8WXbEHac=N0GWxj-s9gqOkgG0VRL5B=ovjwexqm8ev...@mail.gmail.com%3E Thanks Best Regards On Fri, Apr 3, 2015 at 3:53 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: My Spark Job failed with 15/04/03 03:15:36 INFO scheduler.DAGScheduler: Job 0 failed: saveAsNewAPIHadoopFile at AbstractInputHelper.scala:103, took 2.480175 s 15/04/03 03:15:36 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Task 0.0 in stage 2.0 (TID 0) had a not serializable result: com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum Serialization stack: - object not serializable (class: com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum, value: {userId: 0, spsPrgrmId: 0, spsSlrLevelCd: 0, spsSlrLevelSumStartDt: null, spsSlrLevelSumEndDt: null, currPsLvlId: null}) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (0,{userId: 0, spsPrgrmId: 0, spsSlrLevelCd: 0, spsSlrLevelSumStartDt: null, spsSlrLevelSumEndDt: null, currPsLvlId: null})) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 2.0 (TID 0) had a not serializable result: com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum Serialization stack: - object not serializable (class: com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum, value: {userId: 0, spsPrgrmId: 0, spsSlrLevelCd: 0, spsSlrLevelSumStartDt: null, spsSlrLevelSumEndDt: null, currPsLvlId: null}) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (0,{userId: 0, spsPrgrmId: 0, spsSlrLevelCd: 0, spsSlrLevelSumStartDt: null, spsSlrLevelSumEndDt: null, currPsLvlId: null})) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/03 03:15:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Task 0.0 in stage 2.0 (TID 0) had a not serializable result: com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum Serialization stack: com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum is auto generated through avro schema using avro-generate-sources maven pulgin. package com.ebay.ep.poc.spark.reporting.process.model.dw; @SuppressWarnings(all) @org.apache.avro.specific.AvroGenerated public class SpsLevelMetricSum extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { ... ... } Can anyone suggest how to fix this ? -- Deepak
Re: Worker and Nodes
There could be many different things causing this. For example, if you only have a single partition of data, increasing the number of tasks will only increase execution time due to higher scheduling overhead. Additionally, how large is a single partition in your application relative to the amount of memory on the machine? If you are running on a machine with a small amount of memory, increasing the number of executors per machine may increase GC/memory pressure. On a single node, since your executors share a memory and I/O system, you could just thrash everything. In any case, you can’t normally generalize between increased parallelism on a single node and increased parallelism across a cluster. If you are purely limited by CPU, then yes, you can normally make that generalization. However, when you increase the number of workers in a cluster, you are providing your app with more resources (memory capacity and bandwidth, and disk bandwidth). When you increase the number of tasks executing on a single node, you do not increase the pool of available resources. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Feb 21, 2015, at 4:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: K-Means final cluster centers
Unless I misunderstood your question, you’re looking for the val clusterCenters in http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel, no? Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Feb 5, 2015, at 2:35 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to get the final cluster centers after running the KMeans algorithm in MLlib in order to characterize the clusters. But the KMeansModel does not have any public method to retrieve this info. There appears to be only a private method called clusterCentersWithNorm. I guess I could call predict() to get the final cluster assignment for the dataset and write my own code to compute the means based on this final assignment. But I would like to know if there is a way to get this info from MLLib API directly after running KMeans? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-final-cluster-centers-tp21523.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to 'Pipe' Binary Data in Apache Spark
Venkat, No problem! So, creating a custom InputFormat or using sc.binaryFiles alone is not the right solution. We also need the modified version of RDD.pipe to support binary data? Is my understanding correct? Yep! That is correct. The custom InputFormat allows Spark to load binary formatted data from disk/HDFS/S3/etc…, but then the default RDD.pipe reads/writes text to a pipe, so you’d need the custom mapPartitions call. If yes, this can be added as new enhancement Jira request? The code that I have right now is fairly custom to my application, but if there was interest, I would be glad to port it for the Spark core. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 22, 2015, at 7:11 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote: Thanks Frank for your response. So, creating a custom InputFormat or using sc.binaryFiles alone is not the right solution. We also need the modified version of RDD.pipe to support binary data? Is my understanding correct? If yes, this can be added as new enhancement Jira request? Nick: What’s your take on this? Regards, Venkat Ankam From: Frank Austin Nothaft [mailto:fnoth...@berkeley.edu] Sent: Wednesday, January 21, 2015 12:30 PM To: Venkat, Ankam Cc: Nick Allen; user@spark.apache.org Subject: Re: How to 'Pipe' Binary Data in Apache Spark Hi Venkat/Nick, The Spark RDD.pipe method pipes text data into a subprocess and then receives text data back from that process. Once you have the binary data loaded into an RDD properly, to pipe binary data to/from a subprocess (e.g., you want the data in the pipes to contain binary, not text), you need to implement your own, modified version of RDD.pipe. The implementation of RDD.pipe spawns a process per partition (IIRC), as well as threads for writing to and reading from the process (as well as stderr for the process). When writing via RDD.pipe, Spark calls *.toString on the object, and pushes that text representation down the pipe. There is an example of how to pipe binary data from within a mapPartitions call using the Scala API in lines 107-177 of this file. This specific code contains some nastiness around the packaging of downstream libraries that we rely on in that project, so I’m not sure if it is the cleanest way, but it is a workable way. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 21, 2015, at 9:17 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote: I am trying to solve similar problem. I am using option # 2 as suggested by Nick. I have created an RDD with sc.binaryFiles for a list of .wav files. But, I am not able to pipe it to the external programs. For example: sq = sc.binaryFiles(wavfiles) ß All .wav files stored on “wavfiles” directory on HDFS sq.keys().collect() ß works fine. Shows the list of file names. sq.values().collect() ß works fine. Shows the content of the files. sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' 'wav', '-', '-n', 'stats'])).collect() ß Does not work. Tried different options. AttributeError: 'function' object has no attribute 'read' Any suggestions? Regards, Venkat Ankam From: Nick Allen [mailto:n...@nickallen.org] Sent: Friday, January 16, 2015 11:46 AM To: user@spark.apache.org Subject: Re: How to 'Pipe' Binary Data in Apache Spark I just wanted to reiterate the solution for the benefit of the community. The problem is not from my use of 'pipe', but that 'textFile' cannot be used to read in binary data. (Doh) There are a couple options to move forward. 1. Implement a custom 'InputFormat' that understands the binary input data. (Per Sean Owen) 2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a single record. This will impact performance as it prevents the use of more than one mapper on the file's data. In my specific case for #1 I can only find one project from RIPE-NCC (https://github.com/RIPE-NCC/hadoop-pcap) that does this. Unfortunately, it appears to only support a limited set of network protocols. On Fri, Jan 16, 2015 at 10:40 AM, Nick Allen n...@nickallen.org wrote: Per your last comment, it appears I need something like this: https://github.com/RIPE-NCC/hadoop-pcap Thanks a ton. That get me oriented in the right direction. On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen so...@cloudera.com wrote: Well it looks like you're reading some kind of binary file as text. That isn't going to work, in Spark or elsewhere, as binary data is not even necessarily the valid encoding of a string. There are no line breaks to delimit lines and thus elements of the RDD. Your input has some record structure (or else it's not really useful to put it into an RDD). You can encode this as a SequenceFile and read it with objectFile
Re: How to 'Pipe' Binary Data in Apache Spark
Hi Venkat/Nick, The Spark RDD.pipe method pipes text data into a subprocess and then receives text data back from that process. Once you have the binary data loaded into an RDD properly, to pipe binary data to/from a subprocess (e.g., you want the data in the pipes to contain binary, not text), you need to implement your own, modified version of RDD.pipe. The implementation of RDD.pipe spawns a process per partition (IIRC), as well as threads for writing to and reading from the process (as well as stderr for the process). When writing via RDD.pipe, Spark calls *.toString on the object, and pushes that text representation down the pipe. There is an example of how to pipe binary data from within a mapPartitions call using the Scala API in lines 107-177 of this file. This specific code contains some nastiness around the packaging of downstream libraries that we rely on in that project, so I’m not sure if it is the cleanest way, but it is a workable way. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 21, 2015, at 9:17 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote: I am trying to solve similar problem. I am using option # 2 as suggested by Nick. I have created an RDD with sc.binaryFiles for a list of .wav files. But, I am not able to pipe it to the external programs. For example: sq = sc.binaryFiles(wavfiles) ß All .wav files stored on “wavfiles” directory on HDFS sq.keys().collect() ß works fine. Shows the list of file names. sq.values().collect() ß works fine. Shows the content of the files. sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' 'wav', '-', '-n', 'stats'])).collect() ß Does not work. Tried different options. AttributeError: 'function' object has no attribute 'read' Any suggestions? Regards, Venkat Ankam From: Nick Allen [mailto:n...@nickallen.org] Sent: Friday, January 16, 2015 11:46 AM To: user@spark.apache.org Subject: Re: How to 'Pipe' Binary Data in Apache Spark I just wanted to reiterate the solution for the benefit of the community. The problem is not from my use of 'pipe', but that 'textFile' cannot be used to read in binary data. (Doh) There are a couple options to move forward. 1. Implement a custom 'InputFormat' that understands the binary input data. (Per Sean Owen) 2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a single record. This will impact performance as it prevents the use of more than one mapper on the file's data. In my specific case for #1 I can only find one project from RIPE-NCC (https://github.com/RIPE-NCC/hadoop-pcap) that does this. Unfortunately, it appears to only support a limited set of network protocols. On Fri, Jan 16, 2015 at 10:40 AM, Nick Allen n...@nickallen.org wrote: Per your last comment, it appears I need something like this: https://github.com/RIPE-NCC/hadoop-pcap Thanks a ton. That get me oriented in the right direction. On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen so...@cloudera.com wrote: Well it looks like you're reading some kind of binary file as text. That isn't going to work, in Spark or elsewhere, as binary data is not even necessarily the valid encoding of a string. There are no line breaks to delimit lines and thus elements of the RDD. Your input has some record structure (or else it's not really useful to put it into an RDD). You can encode this as a SequenceFile and read it with objectFile. You could also write a custom InputFormat that knows how to parse pcap records directly. On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen n...@nickallen.org wrote: I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe that binary data to an external program that will translate it to string/text data. Unfortunately, it seems that Spark is mangling the binary data before it gets passed to the external program. This code is representative of what I am trying to do. What am I doing wrong? How can I pipe binary data in Spark? Maybe it is getting corrupted when I read it in initially with 'textFile'? bin = sc.textFile(binary-data.dat) csv = bin.pipe (/usr/bin/binary-to-csv.sh) csv.saveAsTextFile(text-data.csv) Specifically, I am trying to use Spark to transform pcap (packet capture) data to text/csv so that I can perform an analysis on it. Thanks! -- Nick Allen n...@nickallen.org -- Nick Allen n...@nickallen.org -- Nick Allen n...@nickallen.org This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro
Shailesh, To add, are you packaging Hadoop in your app? Hadoop will pull in Guava. Not sure if you are using Maven (or what) to build, but if you can pull up your builds dependency tree, you will likely find com.google.guava being brought in by one of your dependencies. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 20, 2015, at 5:13 PM, Shailesh Birari sbirar...@gmail.com wrote: Hello, I double checked the libraries. I am linking only with Spark 1.2. Along with Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked and nothing else. Thanks, Shailesh On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen so...@cloudera.com wrote: Guava is shaded in Spark 1.2+. It looks like you are mixing versions of Spark then, with some that still refer to unshaded Guava. Make sure you are not packaging Spark with your app and that you don't have other versions lying around. On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari sbirar...@gmail.com wrote: Hello, I recently upgraded my setup from Spark 1.1 to Spark 1.2. My existing applications are working fine on ubuntu cluster. But, when I try to execute Spark MLlib application from Eclipse (Windows node) it gives java.lang.NoClassDefFoundError: com/google/common/base/Preconditions exception. Note, 1. With Spark 1.1 this was working fine. 2. The Spark 1.2 jar files are linked in Eclipse project. 3. Checked the jar -tf output and found the above com.google.common.base is not present. -Exception log: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/base/Preconditions at org.apache.spark.network.client.TransportClientFactory.init(TransportClientFactory.java:94) at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77) at org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194) at org.apache.spark.SparkContext.init(SparkContext.scala:340) at org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74) at org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala) Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions at java.net.URLClassLoader$1.run(Unknown Source) at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) ... 7 more - jar -tf output: consb2@CONSB2A /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class com/google/inject/internal/util/$Preconditions.class --- Please help me in resolving this. Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: AVRO specific records
Hi Simone, Matt Massie put together a good tutorial on his blog. If you’re looking for more code using Avro, we use it pretty extensively in our genomics project. Our Avro schemas are here, and we have serialization code here. We use Parquet for storing the Avro records, but there is also an Avro HadoopInputFormat. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Nov 5, 2014, at 1:25 PM, Simone Franzini captainfr...@gmail.com wrote: How can I read/write AVRO specific records? I found several snippets using generic records, but nothing with specific records so far. Thanks, Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
Re: Is it possible to use Parquet with Dremel encoding
Hi Matthes, Can you post an example of your schema? When you refer to nesting, are you referring to optional columns, nested schemas, or tables where there are repeated values? Parquet uses run-length encoding to compress down columns with repeated values, which is the case that your example seems to refer to. The point Matt is making in his post is that if you have a Parquet files with contain records with a nested schema, e.g.: record MyNestedSchema { int nestedSchemaField; } record MySchema { int nonNestedField; MyNestedSchema nestedRecord; } Not all systems support queries against these schemas. If you want to load the data directly into Spark, it isn’t an issue. I’m not familiar with how SparkSQL is handling this, but I believe the bit you quoted is saying that support for nested queries (e.g., select ... from … where nestedRecord.nestedSchemaField == 0) will be added in Spark 1.0.1 (which is currently available, BTW). Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 26, 2014, at 7:38 AM, matthes mdiekst...@sensenetworks.com wrote: Thank you Jey, That is a nice introduction but it is a may be to old (AUG 21ST, 2013) Note: If you keep the schema flat (without nesting), the Parquet files you create can be read by systems like Shark and Impala. These systems allow you to query Parquet files as tables using SQL-like syntax. The Parquet files created by this sample application could easily be queried using Shark for example. But in this post (http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-CaseClass-Parquet-failure-td8377.html) I found this: Nested parquet is not supported in 1.0, but is part of the upcoming 1.0.1 release. So the question now is, can I use it in the benefit way of nested parquet files to find fast with sql or do I have to write a special map/reduce job to transform and find my data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15234.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it possible to use Parquet with Dremel encoding
Matthes, Ah, gotcha! Repeated items in Parquet seem to correspond to the ArrayType in Spark-SQL. I only use Spark, but it does looks like that should be supported in Spark-SQL 1.1.0. I’m not sure though if you can apply predicates on repeated items from Spark-SQL. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 26, 2014, at 8:48 AM, matthes mdiekst...@sensenetworks.com wrote: Hi Frank, thanks al lot for your response, this is a very helpful! Actually I'm try to figure out does the current spark version supports Repetition levels (https://blog.twitter.com/2013/dremel-made-simple-with-parquet) but now it looks good to me. It is very hard to find some good things about that. Now I found this as well: https://git-wip-us.apache.org/repos/asf?p=spark.git;a=blob;f=sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala;h=1dc58633a2a68cd910c1bab01c3d5ee1eb4f8709;hb=f479cf37 I wasn't sure of that because nested data can be many different things! If it works with SQL, to find the firstRepeatedid or secoundRepeatedid would be awesome. But if it only works with kind of map/reduce job than it also good. The most important thing is to filter the first or secound repeated value as fast as possible and in combination as well. I start now to play with this things to get the best search results! Me schema looks like this: val nestedSchema = message nestedRowSchema { int32 firstRepeatedid; repeated group level1 { int64 secoundRepeatedid; repeated group level2 { int64 value1; int32 value2; } } } Best, Matthes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15239.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kryo fails with avro having Arrays and unions, but succeeds with simple avro.
Hi Mohan, It’s a bit convoluted to follow in their source, but they essentially typedef KSerializer as being a KryoSerializer, and then their serializers all extend KSerializer. Spark should identify them properly as Kryo Serializers, but I haven’t tried it myself. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 19, 2014, at 12:03 AM, mohan.gadm mohan.g...@gmail.com wrote: Thanks for the info frank. Twitter's-chill avro serializer looks great. But how does spark identifies it as serializer, as its not extending from KryoSerializer. (sorry scala is an alien lang for me). - Thanks Regards, Mohan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-fails-with-avro-having-Arrays-and-unions-but-succeeds-with-simple-avro-tp14549p14649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: File I/O in spark
Kartheek, What exactly are you trying to do? Those APIs are only for local file access. If you want to access data in HDFS, you’ll want to use one of the reader methods in org.apache.spark.SparkContext which will give you an RDD (e.g., newAPIHadoopFile, sequenceFile, or textFile). If you want to write data to HDFS, you’ll need to have an RDD and use one of the functions in org.apache.spark.RDD (saveAsObjectFile or saveAsTextFile) or one of the PairRDDFunctions (e.g., saveAsNewAPIHadoopFile or saveAsNewAPIHadoopDataset). The Scala/Java IO system can be used inside of Spark, but only for local file access. This is used to implement the rdd.pipe method (IIRC), and we use it in some downstream apps to do IO with processes that we spawn from mapPartitions calls (see here and here). Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 15, 2014, at 9:44 AM, rapelly kartheek kartheek.m...@gmail.com wrote: The file gets created on the fly. So I dont know how to make sure that its accessible to all nodes. On Mon, Sep 15, 2014 at 10:10 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I see that the file gets created in the master node. But, there wont be any data written to it. On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is this code running in an executor? You need to make sure the file is accessible on ALL executors. One way to do that is to use a distributed filesystem like HDFS or GlusterFS. On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to perform some read/write file operations in spark. Somehow I am neither able to write to a file nor read. import java.io._ val writer = new PrintWriter(new File(test.txt )) writer.write(Hello Scala) Can someone please tell me how to perform file I/O in spark.
Re: distcp on ec2 standalone spark cluster
Tomer, To use distcp, you need to have a Hadoop compute cluster up. start-dfs just restarts HDFS. I don’t have a Spark 1.0.2 cluster up right now, but there should be a start-mapred*.sh or start-all.sh script that will launch the Hadoop MapReduce cluster that you will need for distcp. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 8, 2014, at 12:28 AM, Tomer Benyamini tomer@gmail.com wrote: ~/ephemeral-hdfs/sbin/start-mapred.sh does not exist on spark-1.0.2; I restarted hdfs using ~/ephemeral-hdfs/sbin/stop-dfs.sh and ~/ephemeral-hdfs/sbin/start-dfs.sh, but still getting the same error when trying to run distcp: ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) Any idea? Thanks! Tomer On Sun, Sep 7, 2014 at 9:27 PM, Josh Rosen rosenvi...@gmail.com wrote: If I recall, you should be able to start Hadoop MapReduce using ~/ephemeral-hdfs/sbin/start-mapred.sh. On Sun, Sep 7, 2014 at 6:42 AM, Tomer Benyamini tomer@gmail.com wrote: Hi, I would like to copy log files from s3 to the cluster's ephemeral-hdfs. I tried to use distcp, but I guess mapred is not running on the cluster - I'm getting the exception below. Is there a way to activate it, or is there a spark alternative to distcp? Thanks, Tomer mapreduce.Cluster (Cluster.java:initialize(114)) - Failed to use org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid mapreduce.jobtracker.address configuration value for LocalJobRunner : XXX:9001 ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: multiple passes in mapPartitions
Hi Zhen, The Scala iterator trait supports cloning via the duplicate method (http://www.scala-lang.org/api/current/index.html#scala.collection.Iterator@duplicate:(Iterator[A],Iterator[A])). Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jun 13, 2014, at 9:28 PM, zhen z...@latrobe.edu.au wrote: Thank you for your suggestion. We will try it out and see how it performs. We think the single call to mapPartitions will be faster but we could be wrong. It would be nice to have a clone method on the iterator. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555p7616.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is it possible to use Spark, Maven, and Hadoop 2?
Robert, You can build a Spark application using Maven for Hadoop 2 by adding a dependency on the Hadoop 2.* hadoop-client package. If you define any Hadoop Input/Output formats, you may also need to depend on the hadoop-mapreduce package. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sun, Jun 29, 2014 at 12:20 PM, Robert James srobertja...@gmail.com wrote: Although Spark's home page offers binaries for Spark 1.0.0 with Hadoop 2, the Maven repository only seems to have one version, which uses Hadoop 1. Is it possible to use a Maven link and Hadoop 2? What is the id? If not: How can I use the prebuilt binaries to use Hadoop 2? Do I just copy the lib/ dir into my classpath?
Re: Is it possible to use Spark, Maven, and Hadoop 2?
Hi Robert, I’m not sure about sbt; we’re currently using Maven to build. We do create a single jar though, via the Maven shade plugin. Our project has three components, and we routinely distribute the jar for our project’s CLI out across a cluster. If you’re interested, here are our project’s master pom and the pom for our CLI. There are a few dependencies that we exclude from hadoop-client: • asm/asm • org.jboss.netty/netty • org.codehaus.jackson/* • org.sonatype.sisu.inject/* We've built and ran this successfully across both Hadoop 1.0.4 and 2.2.0-2.2.5. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jun 29, 2014, at 4:20 PM, Robert James srobertja...@gmail.com wrote: On 6/29/14, FRANK AUSTIN NOTHAFT fnoth...@berkeley.edu wrote: Robert, You can build a Spark application using Maven for Hadoop 2 by adding a dependency on the Hadoop 2.* hadoop-client package. If you define any Hadoop Input/Output formats, you may also need to depend on the hadoop-mapreduce package. Thank you Frank. Is it possible to do sbt-assembly after that? I get conflicts, because Spark requires via Maven Hadoop 1. I've tried excluding that via sbt, but still get conflicts within Hadoop 2, with different components requiring different versions of other jars. Is it possible to make a jar assembly using your approach? How? If not: How do you distribute the jars to the workers? On Sun, Jun 29, 2014 at 12:20 PM, Robert James srobertja...@gmail.com wrote: Although Spark's home page offers binaries for Spark 1.0.0 with Hadoop 2, the Maven repository only seems to have one version, which uses Hadoop 1. Is it possible to use a Maven link and Hadoop 2? What is the id? If not: How can I use the prebuilt binaries to use Hadoop 2? Do I just copy the lib/ dir into my classpath?
Re: initial basic question from new user
RE: Given that our agg sizes will exceed memory, we expect to cache them to disk, so save-as-object (assuming there are no out of the ordinary performance issues) may solve the problem, but I was hoping to store data is a column orientated format. However I think this in general is not possible - Spark can *read* Parquet, but I think it cannot write Parquet as a disk-based RDD format. Spark can write Parquet, via the ParquetOutputFormat which is distributed from Parquet. If you'd like example code for writing out to Parquet, please see the adamSave function in https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala, starting at line 62. There is a bit of setup necessary for the Parquet write codec, but otherwise it is fairly straightforward. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Thu, Jun 12, 2014 at 7:03 AM, Christopher Nguyen c...@adatao.com wrote: Toby, #saveAsTextFile() and #saveAsObjectFile() are probably what you want for your use case. As for Parquet support, that's newly arrived in Spark 1.0.0 together with SparkSQL so continue to watch this space. Gerard's suggestion to look at JobServer, which you can generalize as building a long-running application which allows multiple clients to load/share/persist/save/collaborate-on RDDs satisfies a larger, more complex use case. That is indeed the job of a higher-level application, subject to a wide variety of higher-level design choices. A number of us have successfully built Spark-based apps around that model. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Thu, Jun 12, 2014 at 4:35 AM, Toby Douglass t...@avocet.io wrote: On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas gerard.m...@gmail.com wrote: The goal of rdd.persist is to created a cached rdd that breaks the DAG lineage. Therefore, computations *in the same job* that use that RDD can re-use that intermediate result, but it's not meant to survive between job runs. As I understand it, Spark is designed for interactive querying, in the sense that the caching of intermediate results eliminates the need to recompute those results. However, if intermediate results last only for the duration of a job (e.g. say a python script), how exactly is interactive querying actually performed? a script is not an interactive medium. Is the shell the only medium for interactive querying? Consider a common usage case : a web-site, which offers reporting upon a large data set. Users issue arbitrary queries. A few queries (just with different arguments) dominate the query load, so we thought to create intermediate RDDs to service those queries, so only those order of magnitude or smaller RDDs would need to be processed. Where this is not possible, we can only use Spark for reporting by issuing each query over the whole data set - e.g. Spark is just like Impala is just like Presto is just like [nnn]. The enourmous benefit of RDDs - the entire point of Spark - so profoundly useful here - is not available. What a huge and unexpected loss! Spark seemingly renders itself ordinary. It is for this reason I am surprised to find this functionality is not available. If you need to ad-hoc persist to files, you can can save RDDs using rdd.saveAsObjectFile(...) [1] and load them afterwards using sparkContext.objectFile(...) I've been using this site for docs; http://spark.apache.org Here we find through the top-of-the-page menus the link API Docs - Python API which brings us to; http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html Where this page does not show the function saveAsObjectFile(). I find now from your link here; https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD What appears to be a second and more complete set of the same documentation, using a different web-interface to boot. It appears at least that there are two sets of documentation for the same APIs, where one set is out of the date and the other not, and the out of date set is that which is linked to from the main site? Given that our agg sizes will exceed memory, we expect to cache them to disk, so save-as-object (assuming there are no out of the ordinary performance issues) may solve the problem, but I was hoping to store data is a column orientated format. However I think this in general is not possible - Spark can *read* Parquet, but I think it cannot write Parquet as a disk-based RDD format. If you want to preserve the RDDs in memory between job runs, you should look at the Spark-JobServer [3] Thankyou. I view this with some trepidation. It took two man-days to get Spark running (and I've spent another man day now trying to get a map/reduce to run; I'm getting there, but not there yet) - the bring-up/config experience for end-users
Re: Spark-ec2 asks for password
Aureliano, I've been noticing this error recently as well: ssh: connect to host ec-xx-xx-xx-xx.compute-1.amazonaws.com port 22: Connection refused Error 255 while executing remote command, retrying after 30 seconds However, this isn't an issue with the spark-ec2 scripts. After the scripts fail, if you wait a bit longer (e.g., another 2 minutes), the EC2 hosts will finish launching and port 22 will open up. Until the EC2 host has launched and opened port 22 for SSH, SSH cannot succeed, and the Spark-ec2 scripts will fail. I've noticed that EC2 machine launch latency seems to be highest in Oregon; I haven't run into this problem on either the California or Virgina EC2 farms. To work around this issue, I've manually modified my copy of the EC2 scripts to wait for 6 failures (i.e., 3 minutes), which seems to work OK. Might be worth a try on your end. I can't comment about the password request; I haven't seen that on my end. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Fri, Apr 18, 2014 at 8:57 PM, Aureliano Buendia buendia...@gmail.comwrote: Hi, Since 0.9.0 spark-ec2 has gone unstable. During launch it throws many errors like: ssh: connect to host ec-xx-xx-xx-xx.compute-1.amazonaws.com port 22: Connection refused Error 255 while executing remote command, retrying after 30 seconds .. and recently, it prompts for passwords!: Warning: Permanently added '' (RSA) to the list of known hosts. Password: Note that the hostname in Permanently added '' is missing in the log, which is probably why it asks for a password. Is this a known bug?
Re: Avro serialization
We use avro objects in our project, and have a Kryo serializer for generic Avro SpecificRecords. Take a look at: https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/ADAMKryoRegistrator.scala Also, Matt Massie has a good blog post about this at http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Thu, Apr 3, 2014 at 7:16 AM, Ian O'Connell i...@ianoconnell.com wrote: Objects been transformed need to be one of these in flight. Source data can just use the mapreduce input formats, so anything you can do with mapred. doing an avro one for this you probably want one of : https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/*ProtoBuf* or just whatever your using at the moment to open them in a MR job probably could be re-purposed On Thu, Apr 3, 2014 at 7:11 AM, Ron Gonzalez zlgonza...@yahoo.com wrote: Hi, I know that sources need to either be java serializable or use kryo serialization. Does anyone have sample code that reads, transforms and writes avro files in spark? Thanks, Ron