Re: Catalog, SessionCatalog and ExternalCatalog in spark 2.0
Thanks Raghavendra :) Will look into Analyzer as well. Kapil Malik *Sr. Principal Engineer | Data Platform, Technology* M: +91 8800836581 | T: 0124-433 | EXT: 20910 ASF Centre A | 1st Floor | Udyog Vihar Phase IV | Gurgaon | Haryana | India *Disclaimer:* This communication is for the sole use of the addressee and is confidential and privileged information. If you are not the intended recipient of this communication, you are prohibited from disclosing it and are required to delete it forthwith. Please note that the contents of this communication do not necessarily represent the views of Jasper Infotech Private Limited ("Company"). E-mail transmission cannot be guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The Company, therefore, does not accept liability for any loss caused due to this communication. *Jasper Infotech Private Limited, Registered Office: 1st Floor, Plot 238, Okhla Industrial Estate, New Delhi - 110020 INDIA CIN: U72300DL2007PTC168097* On Sat, Sep 3, 2016 at 7:27 PM, Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > Kapil -- I afraid you need to plugin your own SessionCatalog as > ResolveRelations class depends on that. To keep up with consistent design > you may like to implement ExternalCatalog as well. > You can also look to plug in your own Analyzer class to give your more > flexibility. Ultimately that is where all Relations get resolved from > SessionCatalog. > > On Sat, Sep 3, 2016 at 5:49 PM, Kapil Malik <kapil.ma...@snapdeal.com> > wrote: > >> Hi all, >> >> I have a Spark SQL 1.6 application in production which does following on >> executing sqlContext.sql(...) - >> 1. Identify the table-name mentioned in query >> 2. Use an external database to decide where's the data located, in which >> format (parquet or csv or jdbc) etc. >> 3. Load the dataframe >> 4. Register it as temp table (for future calls to this table) >> >> This is achieved by extending HiveContext, and correspondingly >> HiveCatalog. I have my own implementation of trait "Catalog", which >> over-rides the "lookupRelation" method to do the magic behind the scenes. >> >> However, in spark 2.0, I can see following - >> SessionCatalog - which contains lookupRelation method, but doesn't have >> any interface / abstract class to it. >> ExternalCatalog - which deals with CatalogTable instead of Df / >> LogicalPlan. >> Catalog - which also doesn't expose any method to lookup Df / LogicalPlan. >> >> So apparently it looks like I need to extend SessionCatalog only. >> However, just wanted to get a feedback on if there's a better / >> recommended approach to achieve this. >> >> >> Thanks and regards, >> >> >> Kapil Malik >> *Sr. Principal Engineer | Data Platform, Technology* >> M: +91 8800836581 | T: 0124-433 | EXT: 20910 >> ASF Centre A | 1st Floor | Udyog Vihar Phase IV | >> Gurgaon | Haryana | India >> >> *Disclaimer:* This communication is for the sole use of the addressee >> and is confidential and privileged information. If you are not the intended >> recipient of this communication, you are prohibited from disclosing it and >> are required to delete it forthwith. Please note that the contents of this >> communication do not necessarily represent the views of Jasper Infotech >> Private Limited ("Company"). E-mail transmission cannot be guaranteed to be >> secure or error-free as information could be intercepted, corrupted, lost, >> destroyed, arrive late or incomplete, or contain viruses. The Company, >> therefore, does not accept liability for any loss caused due to this >> communication. *Jasper Infotech Private Limited, Registered Office: 1st >> Floor, Plot 238, Okhla Industrial Estate, New Delhi - 110020 INDIA CIN: >> U72300DL2007PTC168097* >> >> >
Catalog, SessionCatalog and ExternalCatalog in spark 2.0
Hi all, I have a Spark SQL 1.6 application in production which does following on executing sqlContext.sql(...) - 1. Identify the table-name mentioned in query 2. Use an external database to decide where's the data located, in which format (parquet or csv or jdbc) etc. 3. Load the dataframe 4. Register it as temp table (for future calls to this table) This is achieved by extending HiveContext, and correspondingly HiveCatalog. I have my own implementation of trait "Catalog", which over-rides the "lookupRelation" method to do the magic behind the scenes. However, in spark 2.0, I can see following - SessionCatalog - which contains lookupRelation method, but doesn't have any interface / abstract class to it. ExternalCatalog - which deals with CatalogTable instead of Df / LogicalPlan. Catalog - which also doesn't expose any method to lookup Df / LogicalPlan. So apparently it looks like I need to extend SessionCatalog only. However, just wanted to get a feedback on if there's a better / recommended approach to achieve this. Thanks and regards, Kapil Malik *Sr. Principal Engineer | Data Platform, Technology* M: +91 8800836581 | T: 0124-433 | EXT: 20910 ASF Centre A | 1st Floor | Udyog Vihar Phase IV | Gurgaon | Haryana | India *Disclaimer:* This communication is for the sole use of the addressee and is confidential and privileged information. If you are not the intended recipient of this communication, you are prohibited from disclosing it and are required to delete it forthwith. Please note that the contents of this communication do not necessarily represent the views of Jasper Infotech Private Limited ("Company"). E-mail transmission cannot be guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The Company, therefore, does not accept liability for any loss caused due to this communication. *Jasper Infotech Private Limited, Registered Office: 1st Floor, Plot 238, Okhla Industrial Estate, New Delhi - 110020 INDIA CIN: U72300DL2007PTC168097*
Design query regarding dataframe usecase
Hi, We have an analytics usecase where we are collecting user click logs. The data can be considered as hierarchical with 3 type of logs - User (attributes like userId, emailId) - Session (attributes like sessionId, device, OS, browser, city etc.) - - PageView (attributes like url, referrer, page-type etc.) userId is present in every session and pageView log sessionId is present in every pageView log. *Objective*: To store data in a query-able format to run queries like - "select city, unique(user), count(session), count(pageView), group by city" To get number of (unique) users, sessions and of pageviews per city. This is just an example, but you get the idea that a query may span across the hierarchical nature of data, and should allow for select, aggregate, groupby and where clause. *RDD approach:* * Read all the logs as single RDD * Convert to pair RDD* GroupByKey for RDD * Shift the key and flat-map to pair RDD As I understand, with catalyst and tungsten, Dataframes are lot more optimized than vanilla RDDs. So can anyone suggest on how can I support such queries using DataFrames (purely or atleast more optimally)? One way which I thought was to create different DataFrames for user, session and page-view logs. However, then I would have to join and it will cause network shuffle. If I keep all the logs originally partitioned on user, read them as a single df, split to 2 df (based on log type = session and log type = pageview), and do a join, will it still cause network shuffle ? Thanks, Kapil
RE: Problem getting program to run on 15TB input
Very interesting and relevant thread for production level usage of spark. @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase? Thanks, Kapil Malik | kma...@adobe.commailto:kma...@adobe.com | 33430 / 8800836581 From: Daniel Mahler [mailto:dmah...@gmail.com] Sent: 13 April 2015 15:42 To: Arun Luthra Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org Subject: Re: Problem getting program to run on 15TB input Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help. On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote: Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote: I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, real=0.02 secs] 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, real=0.04 secs] 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, real=0.08 secs] 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, real=0.02 secs] 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, real=0.02 secs] 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, real=0.02 secs] So ~9GB is getting GC'ed every few seconds. Which seems like a lot. Question: The filter() is removing 99% of the data. Does this 99% of the data get GC'ed? Now, I was able to finally get to reduceByKey() by reducing the number of executor-cores (to 2), based on suggestions at http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html . This makes everything before reduceByKey() run pretty smoothly. I ran this with more executor-memory and less executors (most important thing was fewer executor-cores): --num-executors 150 \ --driver-memory 15g \ --executor-memory 110g \ --executor-cores 32 \ But then, reduceByKey() fails with: java.lang.OutOfMemoryError: Java heap space On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote: The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.commailto:ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.commailto:paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.commailto:arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
RE: Passing around SparkContext with in the Driver
Replace val sqlContext = new SQLContext(sparkContext) with @transient val sqlContext = new SQLContext(sparkContext) -Original Message- From: kpeng1 [mailto:kpe...@gmail.com] Sent: 04 March 2015 23:39 To: user@spark.apache.org Subject: Passing around SparkContext with in the Driver Hi All, I am trying to create a class that wraps functionalities that I need; some of these functions require access to the SparkContext, which I would like to pass in. I know that the SparkContext is not seralizable, and I am not planning on passing it to worker nodes or anything, I just want to wrap some functionalities that require SparkContext's api. As a preface, I am basically using the spark shell to test the functionality of my code at the moment, so I am not sure if that plays into any of the issues I am having. Here is my current class: class MyClass(sparkContext: SparkContext) { import org.apache.spark.sql._ import org.apache.spark.rdd._ val sqlContext = new SQLContext(sparkContext) val DATA_TYPE_MAPPING = Map( int - IntegerType, double - DoubleType, float - FloatType, long - LongType, short - ShortType, binary - BinaryType, bool - BooleanType, byte - ByteType, string - StringType) //removes the first line of a text file def removeHeader(partitionIdx: Int, fileItr: Iterator[String]): Iterator[String] ={ //header line is first line in first partition if(partitionIdx == 0){ fileItr.drop(1) } fileItr } //returns back a StructType for the schema def getSchema(rawSchema: Array[String]): StructType ={ //return backs a StructField def getSchemaFieldHelper(schemaField: String): StructField ={ val schemaParts = schemaField.split(' ') StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true) } val structFields = rawSchema.map(column = getSchemaFieldHelper(column)) StructType(structFields) } def getRow(strRow: String): Row ={ val spRow = strRow.split(',') val tRow = spRow.map(_.trim) Row(tRow:_*) } def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile: String): SchemaRDD ={ //apply schema to rdd to create schemaRDD def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType): SchemaRDD ={ val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct) schemaRDD } val rawSchema = sparkContext.textFile(schemaFile).collect val schema = getSchema(rawSchema) val rawCsvData = sparkContext.textFile(csvFile) //if we want to keep header from csv file if(includeHeader){ val rowRDD = rawCsvData.map(getRow) val schemaRDD = createSchemaRDD(rowRDD, schema) return schemaRDD } val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader) val rowRDD = csvData.map(getRow) val schemaRDD = createSchemaRDD(rowRDD, schema) schemaRDD } } So in the spark shell I am basically creating an instance of this class and calling applySchemaToCsv like so: val test = new MyClass(sc) test.applySchemaToCsv(/tmp/myFile.csv, false, /tmp/schema.txt) What I am getting is not serializable exception: 15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at console:62 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615) . . . Caused by: java.io.NotSerializableException: If I remove the class wrapper and make references to sc directly everything works. I am basically wondering what is causing the serialization issues and if I can wrap a class around these functions. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: does calling cache()/persist() on a RDD trigger its immediate evaluation?
Hi Pengcheng YIN, RDD cache / persist calls do not trigger evaluation. Unpersist call is blocking (it does have an async flavor but am not sure what are the SLAs on behavior). val rdd = sc.textFile().map() rdd.persist() // This does not trigger actual storage while(true){ val count = rdd.filter().count // this will trigger storage of RDD, so far so good if(count == 0) break newRdd = /* some codes that use `rdd` several times, and produce an new RDD */ rdd.unpersist() // This is immediate !!, if newRDD has not been evaluated + stored yet, it's not good rdd = newRdd.persist() // this will do nothing till next iteration of loop (at count). } IMHO, last 3 lines can be replaced with - newRdd = /* some codes that use `rdd` several times, and produce an new RDD */ ADDED -- newRdd.persist( ) // mark for storage ADDED -- newRdd.filter( ... ).count // trigger storage rdd.unpersist() rdd = newRdd Although, others can correct me if I am mistaken. You can also verify this with small dataset. Thanks, Kapil -Original Message- From: Pengcheng YIN [mailto:pcyin1...@gmail.com] Sent: 04 January 2015 12:53 To: user@spark.apache.org Subject: does calling cache()/persist() on a RDD trigger its immediate evaluation? Hi Pro, I have a question regarding calling cache()/persist() on an RDD. All RDDs in Spark are lazily evaluated, but does calling cache()/persist() on a RDD trigger its immediate evaluation? My spark app is something like this: val rdd = sc.textFile().map() rdd.persist() while(true){ val count = rdd.filter().count if(count == 0) break newRdd = /* some codes that use `rdd` several times, and produce an new RDD */ rdd.unpersist() rdd = newRdd.persist() } In each iteration, I persist `rdd`, and unpersist it at the end of the iteration, replace `rdd` with persisted `newRdd`. My concern is that, if RDD is not evaluated and persisted when persist() is called, I need to change the position of persist()/unpersist() called to make it more efficient. Thanks, Pengcheng - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: FlatMapValues
Hi Sanjay, I tried running your code on spark shell piece by piece – // Setup val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin with val r2 = r1.map(line = line.split(',')) // RDD[Array[String]] – so far, so good val r3 = r2.map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) // Returns a pair (String, String), good } else { // Returns a String, bad } }) // RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { (,) } }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) Please note that this too saves lines like (025126,Chills), i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is == 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
RE: FlatMapValues
Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Fwd: Sample Spark Program Error
Hi Naveen, Quoting http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext SparkContext is Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one So stop ( ) shuts down the connection between Driver program and Spark master, and does some cleanup. Indeed, after calling this, you cannot do any operation on it or on any RDD created via this context. Regards, Kapil From: Naveen Madhire [mailto:vmadh...@umail.iu.edu] Sent: 31 December 2014 22:08 To: RK Cc: user@spark.apache.org Subject: Re: Fwd: Sample Spark Program Error Yes. The exception is gone now after adding stop() at the end. Can you please tell me what this stop() does at the end. Does it disable the spark context. On Wed, Dec 31, 2014 at 10:09 AM, RK prk...@yahoo.commailto:prk...@yahoo.com wrote: If you look at your program output closely, you can see the following output. Lines with a: 24, Lines with b: 15 The exception seems to be happening with Spark cleanup after executing your code. Try adding sc.stop() at the end of your program to see if the exception goes away. On Wednesday, December 31, 2014 6:40 AM, Naveen Madhire vmadh...@umail.iu.edumailto:vmadh...@umail.iu.edu wrote: Hi All, I am trying to run a sample Spark program using Scala SBT, Below is the program, def main(args: Array[String]) { val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should be some file on your system val sc = new SparkContext(local, Simple App, E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar)) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } Below is the error log, 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+673 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called with curMem=34047, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 2032.0 B, free 267.2 MB) 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on zealot:61452 (size: 2032.0 B, free: 267.3 MB) 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_0 14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2300 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1264 bytes) 14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1) 14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found, computing it 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+673 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3507 ms on localhost (1/2) 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called with curMem=36079, maxMem=280248975 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 1912.0 B, free 267.2 MB) 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory on zealot:61452 (size: 1912.0 B, free: 267.3 MB) 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_1 14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 2300 bytes result sent to driver 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 261 ms on localhost (2/2) 14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at Test1.scala:19) finished in 3.811 s 14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at Test1.scala:19, took 3.997365232 s 14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at Test1.scala:20 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at Test1.scala:20) with 2 output partitions (allowLocal=false) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at Test1.scala:20) 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List() 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called with curMem=37991, maxMem=280248975 14/12/30
RE: Determination of number of RDDs
Regarding: Can we create such an array and then parallelize it? Parallelizing an array of RDDs - i.e. RDD[RDD[x]] is not possible. RDD is not serializable. From: Deep Pradhan [mailto:pradhandeep1...@gmail.com] Sent: 04 December 2014 15:39 To: user@spark.apache.org Subject: Determination of number of RDDs Hi, I have a graph and I want to create RDDs equal in number to the nodes in the graph. How can I do that? If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX? Like in C language we have array of pointers. Do we have array of RDDs in Spark. Can we create such an array and then parallelize it? Thank You
RE: Snappy error with Spark SQL
Hi, Try adding this in spark-env.sh export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/usr/lib/hadoop-0.20-mapreduce/lib/native/Linux-amd64-64 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/hadoop-0.20-mapreduce/lib/native/Linux-amd64-64 export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/usr/lib/hadoop-0.20-mapreduce/lib/native/Linux-amd64-64 export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/lib/hadoop-0.20-mapreduce/lib/snappy-java-1.0.4.1.jar Pointing to eqv. snappy / MR directory on your box. Thanks, Kapil Malik From: Naveen Kumar Pokala [mailto:npok...@spcapitaliq.com] Sent: 12 November 2014 19:59 To: user@spark.apache.org Subject: Snappy error with Spark SQL HI, I am facing the following problem when I am trying to save my RDD as parquet File. 14/11/12 07:43:59 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 48,): org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:236) org.xerial.snappy.Snappy.clinit(Snappy.java:48) parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:64) org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:109) parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:70) parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119) parquet.column.impl.ColumnWriterImpl.flush(ColumnWriterImpl.java:199) parquet.column.impl.ColumnWriteStoreImpl.flush(ColumnWriteStoreImpl.java:108) parquet.hadoop.InternalParquetRecordWriter.flushStore(InternalParquetRecordWriter.java:146) parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:110) parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73) org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:305) org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318) org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 14/11/12 07:43:59 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 1.0 (TID 51,): java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:64) org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:109) parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:70) parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119) parquet.column.impl.ColumnWriterImpl.flush(ColumnWriterImpl.java:199) parquet.column.impl.ColumnWriteStoreImpl.flush(ColumnWriteStoreImpl.java:108) parquet.hadoop.InternalParquetRecordWriter.flushStore(InternalParquetRecordWriter.java:146) parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:110) parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73) org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:305) org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318) org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Please help me. Regards, Naveen.
RE: Help with processing multiple RDDs
Hi, How is 78g distributed in driver, daemon, executor ? Can you please paste the logs regarding that I don't have enough memory to hold the data in memory Are you collecting any data in driver ? Lastly, did you try doing a re-partition to create smaller and evenly distributed partitions? Regards, Kapil -Original Message- From: akhandeshi [mailto:ami.khande...@gmail.com] Sent: 12 November 2014 03:44 To: u...@spark.incubator.apache.org Subject: Help with processing multiple RDDs I have been struggling to process a set of RDDs. Conceptually, it is is not a large data set. It seems, no matter how much I provide to JVM or partition, I can't seem to process this data. I am caching the RDD. I have tried persit(disk and memory), perist(memory) and persist(off_heap) with no success. Currently I am giving 78g to my driver, daemon and executor memory. Currently, it seems to have trouble with one of the largest partition, rdd_22_29 which is 25.9 GB. The metrics page shows Summary Metrics for 29 Completed Tasks. However, I don't see few partitions on the list below. However, i do seem to have warnings in the log file, indicating that I don't have enough memory to hold the data in memory. I don't understand, what I am doing wrong or how I can troubleshoot. Any pointers will be appreciated... 14/11/11 21:28:45 WARN CacheManager: Not enough space to cache partition rdd_22_20 in memory! Free memory is 17190150496 bytes. 14/11/11 21:29:27 WARN CacheManager: Not enough space to cache partition rdd_22_13 in memory! Free memory is 17190150496 bytes. Block Name Storage Level Size in Memory Size on DiskExecutors rdd_22_0Memory Deserialized 1x Replicated 2.1 MB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_10 Memory Deserialized 1x Replicated 7.0 GB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_11 Memory Deserialized 1x Replicated 1290.2 MB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_12 Memory Deserialized 1x Replicated 1167.7 KB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_14 Memory Deserialized 1x Replicated 3.8 GB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_15 Memory Deserialized 1x Replicated 4.0 MB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_16 Memory Deserialized 1x Replicated 2.4 GB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_17 Memory Deserialized 1x Replicated 37.6 MB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_18 Memory Deserialized 1x Replicated 120.9 MB0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_19 Memory Deserialized 1x Replicated 755.9 KB0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_2Memory Deserialized 1x Replicated 289.5 KB0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_21 Memory Deserialized 1x Replicated 11.9 KB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_22 Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_23 Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_24 Memory Deserialized 1x Replicated 3.0 MB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_25 Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_26 Memory Deserialized 1x Replicated 4.0 GB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_27 Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_28 Memory Deserialized 1x Replicated 1846.1 KB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_29 Memory Deserialized 1x Replicated 25.9 GB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_3Memory Deserialized 1x Replicated 267.1 KB0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_4Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_5Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_6Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_7Memory Deserialized 1x Replicated 14.8 KB 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_8Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 rdd_22_9Memory Deserialized 1x Replicated 24.0 B 0.0 B mddworker.c.fi-mdd-poc.internal:54974 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-processing-multiple-RDDs-tp18628.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
RE: unsubscribe
Ohh ! I thought you're unsubscribing :) Kapil Malik | kma...@adobe.com | 33430 / 8800836581 -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: 12 March 2014 00:51 To: user@spark.apache.org Subject: Re: unsubscribe To unsubscribe from this list, please send a message to user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org and it will automatically unsubscribe you. Matei On Mar 11, 2014, at 12:15 PM, Abhishek Pratap apra...@sagebase.orgmailto:apra...@sagebase.org wrote: