Re: Catalog, SessionCatalog and ExternalCatalog in spark 2.0

2016-09-03 Thread Kapil Malik
Thanks Raghavendra :)
Will look into Analyzer as well.


Kapil Malik
*Sr. Principal Engineer | Data Platform, Technology*
M: +91 8800836581 | T: 0124-433 | EXT: 20910
ASF Centre A | 1st Floor | Udyog Vihar Phase IV |
Gurgaon | Haryana | India

*Disclaimer:* This communication is for the sole use of the addressee and
is confidential and privileged information. If you are not the intended
recipient of this communication, you are prohibited from disclosing it and
are required to delete it forthwith. Please note that the contents of this
communication do not necessarily represent the views of Jasper Infotech
Private Limited ("Company"). E-mail transmission cannot be guaranteed to be
secure or error-free as information could be intercepted, corrupted, lost,
destroyed, arrive late or incomplete, or contain viruses. The Company,
therefore, does not accept liability for any loss caused due to this
communication. *Jasper Infotech Private Limited, Registered Office: 1st
Floor, Plot 238, Okhla Industrial Estate, New Delhi - 110020 INDIA CIN:
U72300DL2007PTC168097*


On Sat, Sep 3, 2016 at 7:27 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> Kapil -- I afraid you need to plugin your own SessionCatalog as
> ResolveRelations class depends on that. To keep up with consistent design
> you may like to implement ExternalCatalog as well.
> You can also look to plug in your own Analyzer class to give your more
> flexibility. Ultimately that is where all Relations get resolved from
> SessionCatalog.
>
> On Sat, Sep 3, 2016 at 5:49 PM, Kapil Malik <kapil.ma...@snapdeal.com>
> wrote:
>
>> Hi all,
>>
>> I have a Spark SQL 1.6 application in production which does following on
>> executing sqlContext.sql(...) -
>> 1. Identify the table-name mentioned in query
>> 2. Use an external database to decide where's the data located, in which
>> format (parquet or csv or jdbc) etc.
>> 3. Load the dataframe
>> 4. Register it as temp table (for future calls to this table)
>>
>> This is achieved by extending HiveContext, and correspondingly
>> HiveCatalog. I have my own implementation of trait "Catalog", which
>> over-rides the "lookupRelation" method to do the magic behind the scenes.
>>
>> However, in spark 2.0, I can see following -
>> SessionCatalog - which contains lookupRelation method, but doesn't have
>> any interface / abstract class to it.
>> ExternalCatalog - which deals with CatalogTable instead of Df /
>> LogicalPlan.
>> Catalog - which also doesn't expose any method to lookup Df / LogicalPlan.
>>
>> So apparently it looks like I need to extend SessionCatalog only.
>> However, just wanted to get a feedback on if there's a better /
>> recommended approach to achieve this.
>>
>>
>> Thanks and regards,
>>
>>
>> Kapil Malik
>> *Sr. Principal Engineer | Data Platform, Technology*
>> M: +91 8800836581 | T: 0124-433 | EXT: 20910
>> ASF Centre A | 1st Floor | Udyog Vihar Phase IV |
>> Gurgaon | Haryana | India
>>
>> *Disclaimer:* This communication is for the sole use of the addressee
>> and is confidential and privileged information. If you are not the intended
>> recipient of this communication, you are prohibited from disclosing it and
>> are required to delete it forthwith. Please note that the contents of this
>> communication do not necessarily represent the views of Jasper Infotech
>> Private Limited ("Company"). E-mail transmission cannot be guaranteed to be
>> secure or error-free as information could be intercepted, corrupted, lost,
>> destroyed, arrive late or incomplete, or contain viruses. The Company,
>> therefore, does not accept liability for any loss caused due to this
>> communication. *Jasper Infotech Private Limited, Registered Office: 1st
>> Floor, Plot 238, Okhla Industrial Estate, New Delhi - 110020 INDIA CIN:
>> U72300DL2007PTC168097*
>>
>>
>


Catalog, SessionCatalog and ExternalCatalog in spark 2.0

2016-09-03 Thread Kapil Malik
Hi all,

I have a Spark SQL 1.6 application in production which does following on
executing sqlContext.sql(...) -
1. Identify the table-name mentioned in query
2. Use an external database to decide where's the data located, in which
format (parquet or csv or jdbc) etc.
3. Load the dataframe
4. Register it as temp table (for future calls to this table)

This is achieved by extending HiveContext, and correspondingly HiveCatalog.
I have my own implementation of trait "Catalog", which over-rides the
"lookupRelation" method to do the magic behind the scenes.

However, in spark 2.0, I can see following -
SessionCatalog - which contains lookupRelation method, but doesn't have any
interface / abstract class to it.
ExternalCatalog - which deals with CatalogTable instead of Df / LogicalPlan.
Catalog - which also doesn't expose any method to lookup Df / LogicalPlan.

So apparently it looks like I need to extend SessionCatalog only.
However, just wanted to get a feedback on if there's a better / recommended
approach to achieve this.


Thanks and regards,


Kapil Malik
*Sr. Principal Engineer | Data Platform, Technology*
M: +91 8800836581 | T: 0124-433 | EXT: 20910
ASF Centre A | 1st Floor | Udyog Vihar Phase IV |
Gurgaon | Haryana | India

*Disclaimer:* This communication is for the sole use of the addressee and
is confidential and privileged information. If you are not the intended
recipient of this communication, you are prohibited from disclosing it and
are required to delete it forthwith. Please note that the contents of this
communication do not necessarily represent the views of Jasper Infotech
Private Limited ("Company"). E-mail transmission cannot be guaranteed to be
secure or error-free as information could be intercepted, corrupted, lost,
destroyed, arrive late or incomplete, or contain viruses. The Company,
therefore, does not accept liability for any loss caused due to this
communication. *Jasper Infotech Private Limited, Registered Office: 1st
Floor, Plot 238, Okhla Industrial Estate, New Delhi - 110020 INDIA CIN:
U72300DL2007PTC168097*


Design query regarding dataframe usecase

2016-01-11 Thread Kapil Malik
Hi,

We have an analytics usecase where we are collecting user click logs. The
data can be considered as hierarchical with 3 type of logs -
User (attributes like userId, emailId)
- Session (attributes like sessionId, device, OS, browser, city etc.)
- - PageView (attributes like url, referrer, page-type etc.)

userId is present in every session and pageView log
sessionId is present in every pageView log.

*Objective*: To store data in a query-able format to run queries like -
"select city, unique(user), count(session), count(pageView), group by city"
To get number of (unique) users, sessions and of pageviews per city.
This is just an example, but you get the idea that a query may span across
the hierarchical nature of data, and should allow for select, aggregate,
groupby and where clause.

*RDD approach:*
* Read all the logs as single RDD
* Convert to pair RDD
* GroupByKey for RDD
* Shift the key and flat-map to pair RDD

As I understand, with catalyst and tungsten, Dataframes are lot more
optimized than vanilla RDDs. So can anyone suggest on how can I support
such queries using DataFrames (purely or atleast more optimally)?

One way which I thought was to create different DataFrames for user,
session and page-view logs. However, then I would have to join and it will
cause network shuffle.

If I keep all the logs originally partitioned on user, read them as a
single df, split to 2 df (based on log type = session and log type =
pageview), and do a join, will it still cause network shuffle ?


Thanks,

Kapil


RE: Problem getting program to run on 15TB input

2015-06-06 Thread Kapil Malik
Very interesting and relevant thread for production level usage of spark.

@Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?

Thanks,

Kapil Malik | kma...@adobe.commailto:kma...@adobe.com | 33430 / 8800836581

From: Daniel Mahler [mailto:dmah...@gmail.com]
Sent: 13 April 2015 15:42
To: Arun Luthra
Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
Subject: Re: Problem getting program to run on 15TB input

Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
Everything works smoothly if I do the 99%-removal filter in Hive first. So, all 
the baggage from garbage collection was breaking it.

Is there a way to filter() out 99% of the data without having to garbage 
collect 99% of the RDD?

On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
I tried a shorter simper version of the program, with just 1 RDD,  essentially 
it is:

sc.textFile(..., N).map().filter().map( blah = (id, 
1L)).reduceByKey().saveAsTextFile(...)

Here is a typical GC log trace from one of the yarn container logs:

54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 
9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, 
real=0.02 secs]
77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 
9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, 
real=0.04 secs]
79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 
9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, 
real=0.08 secs]
92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 
9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, 
real=0.02 secs]
114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 
9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, 
real=0.02 secs]
117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 
9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, 
real=0.02 secs]

So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

Question: The filter() is removing 99% of the data. Does this 99% of the data 
get GC'ed?

Now, I was able to finally get to reduceByKey() by reducing the number of 
executor-cores (to 2), based on suggestions at 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

I ran this with more executor-memory and less executors (most important thing 
was fewer executor-cores):

--num-executors 150 \
--driver-memory 15g \
--executor-memory 110g \
--executor-cores 32 \

But then, reduceByKey() fails with:

java.lang.OutOfMemoryError: Java heap space




On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
The Spark UI names the line number and name of the operation (repartition in 
this case) that it is performing. Only if this information is wrong (just a 
possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey (or 
simply countByKey) which is relatively inexpensive. For the purposes of this 
algorithm I can simply log and remove keys with huge counts, before doing 
groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
ilike...@gmail.commailto:ilike...@gmail.com wrote:
All stated symptoms are consistent with GC pressure (other nodes timeout trying 
to connect because of a long stop-the-world), quite possibly due to groupByKey. 
groupByKey is a very expensive operation as it may bring all the data for a 
particular partition into memory (in particular, it cannot spill values for a 
single key, so if you have a single very skewed key you can get behavior like 
this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
paul.sz...@gmail.commailto:paul.sz...@gmail.com wrote:

But groupbykey will repartition according to numer of keys as I understand how 
it works. How do you know that you haven't reached the groupbykey phase? Are 
you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
arun.lut...@gmail.commailto:arun.lut...@gmail.com napisał:

A correction to my first post:

There is also a repartition right before groupByKey to help avoid 
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1

RE: Passing around SparkContext with in the Driver

2015-03-04 Thread Kapil Malik
Replace 
val sqlContext = new SQLContext(sparkContext)
with

@transient 
val sqlContext = new SQLContext(sparkContext)

-Original Message-
From: kpeng1 [mailto:kpe...@gmail.com] 
Sent: 04 March 2015 23:39
To: user@spark.apache.org
Subject: Passing around SparkContext with in the Driver

Hi All,

I am trying to create a class that wraps functionalities that I need; some of 
these functions require access to the SparkContext, which I would like to pass 
in.  I know that the SparkContext is not seralizable, and I am not planning on 
passing it to worker nodes or anything, I just want to wrap some 
functionalities that require SparkContext's api.  As a preface, I am basically 
using the spark shell to test the functionality of my code at the moment, so I 
am not sure if that plays into any of the issues I am having. 
Here is my current class:

class MyClass(sparkContext: SparkContext) {
  import org.apache.spark.sql._
  import org.apache.spark.rdd._

  val sqlContext = new SQLContext(sparkContext)

  val DATA_TYPE_MAPPING = Map(
int - IntegerType,
double - DoubleType,
float - FloatType,
long - LongType,
short - ShortType,
binary - BinaryType,
bool - BooleanType,
byte - ByteType,
string - StringType)

  //removes the first line of a text file
  def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
//header line is first line in first partition
if(partitionIdx == 0){
  fileItr.drop(1)
}
fileItr
  }

  //returns back a StructType for the schema
  def getSchema(rawSchema: Array[String]): StructType ={
//return backs a StructField
def getSchemaFieldHelper(schemaField: String): StructField ={
  val schemaParts = schemaField.split(' ')
  StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)
}

val structFields = rawSchema.map(column = getSchemaFieldHelper(column))
StructType(structFields)
  }

  def getRow(strRow: String): Row ={
val spRow = strRow.split(',')
val tRow = spRow.map(_.trim)
Row(tRow:_*)
  }
  def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
//apply schema to rdd to create schemaRDD
def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
  val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
  schemaRDD
}
  
val rawSchema = sparkContext.textFile(schemaFile).collect
val schema = getSchema(rawSchema)
  
val rawCsvData = sparkContext.textFile(csvFile)
  
//if we want to keep header from csv file
if(includeHeader){
  val rowRDD = rawCsvData.map(getRow) 
  val schemaRDD = createSchemaRDD(rowRDD, schema)
  return schemaRDD
}
 
val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
val rowRDD = csvData.map(getRow)
val schemaRDD = createSchemaRDD(rowRDD, schema)
schemaRDD
  }

}

So in the spark shell I am basically creating an instance of this class and 
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv(/tmp/myFile.csv, false, /tmp/schema.txt)

What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
console:62
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
   .
   .
   .
Caused by: java.io.NotSerializableException:


If I remove the class wrapper and make references to sc directly everything 
works.  I am basically wondering what is causing the serialization issues and 
if I can wrap a class around these functions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: does calling cache()/persist() on a RDD trigger its immediate evaluation?

2015-01-04 Thread Kapil Malik
Hi Pengcheng YIN,
RDD cache / persist calls do not trigger evaluation.
Unpersist call is blocking (it does have an async flavor but am not sure what 
are the SLAs on behavior).

val rdd = sc.textFile().map()
rdd.persist() // This does not trigger actual storage
while(true){
val count = rdd.filter().count // this will trigger storage of RDD, so far 
so good
if(count == 0)
break

newRdd = /* some codes that use `rdd` several times, and produce an new RDD 
*/
rdd.unpersist() // This is immediate !!, if newRDD has not been evaluated + 
stored yet, it's not good
rdd = newRdd.persist() // this will do nothing till next iteration of loop 
(at count).
}

IMHO, last 3 lines can be replaced with -

newRdd = /* some codes that use `rdd` several times, and produce an new RDD 
*/
ADDED --   newRdd.persist( ) // mark for storage
ADDED  --   newRdd.filter( ... ).count // trigger storage
rdd.unpersist()
rdd = newRdd

Although, others can correct me if I am mistaken. You can also verify this with 
small dataset.

Thanks,

Kapil 
-Original Message-
From: Pengcheng YIN [mailto:pcyin1...@gmail.com] 
Sent: 04 January 2015 12:53
To: user@spark.apache.org
Subject: does calling cache()/persist() on a RDD trigger its immediate 
evaluation?

Hi Pro,

I have a question regarding calling cache()/persist() on an RDD. All RDDs in 
Spark are lazily evaluated, but does calling cache()/persist() on a RDD trigger 
its immediate evaluation?

My spark app is something like this:

val rdd = sc.textFile().map()
rdd.persist()
while(true){
val count = rdd.filter().count
if(count == 0)
break

newRdd = /* some codes that use `rdd` several times, and produce an new RDD 
*/
rdd.unpersist()
rdd = newRdd.persist()
}

In each iteration, I persist `rdd`, and unpersist it at the end of the 
iteration, replace `rdd` with persisted `newRdd`. My concern is that, if RDD is 
not evaluated and persisted when persist() is called, I need to change the 
position of persist()/unpersist() called to make it more efficient.

Thanks,
Pengcheng




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: FlatMapValues

2014-12-31 Thread Kapil Malik
Hi Sanjay,

I tried running your code on spark shell piece by piece –

// Setup
val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val lines = Array[String](line1, line2)

val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin 
with

val r2 = r1.map(line = line.split(',')) // RDD[Array[String]] – so far, so good
val r3 = r2.map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
 // Returns a pair (String, String), good
  }
  else {
 // Returns a String, bad
  }
  }) // RDD[Serializable] – PROBLEM

I was not even able to apply flatMapValues since the filtered RDD passed to it 
is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled 
correctly.


The following changes in your snippet make it work as intended -


reacRdd.map(line = line.split(',')).map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
  }
  else {
(,)
  }
  }).filter(pair = pair._1.length()  0).flatMapValues(skus = 
skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)

Please note that this too saves lines like (025126,Chills), i.e. with opening 
and closing brackets ( and ). If you want to get rid of them, better do another 
map operation to map pair to String.

Kapil

From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues

hey guys

My dataset is like this

025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10

Intended output is
==
025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia


My code is as follows but the flatMapValues does not work even after I have 
created the pair RDD.



reacRdd.map(line = line.split(',')).map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
  }
  else {

  }
  }).filter(line = line.toString.length()  0).flatMapValues(skus = 
skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)



thanks

sanjay


RE: FlatMapValues

2014-12-31 Thread Kapil Malik
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

From the clarification below, the problem is that you are calling 
flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
sanjaysubraman...@yahoo.com wrote:
 My understanding is as follows

 STEP 1 (This would create a pair RDD)
 ===

 reacRdd.map(line = line.split(',')).map(fields = {
   if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

 (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
   }
   else {
 
   }
   })

 STEP 2
 ===
 Since previous step created a pair RDD, I thought flatMapValues method 
 will be applicable.
 But the code does not even compile saying that flatMapValues is not 
 applicable to RDD :-(


 reacRdd.map(line = line.split(',')).map(fields = {
   if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

 (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
   }
   else {
 
   }
   }).flatMapValues(skus =
 skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)


 SUMMARY
 ===
 when a dataset looks like the following

 1,red,blue,green
 2,yellow,violet,pink

 I want to output the following and I am asking how do I do that ? 
 Perhaps my code is 100% wrong. Please correct me and educate me :-)

 1,red
 1,blue
 1,green
 2,yellow
 2,violet
 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Fwd: Sample Spark Program Error

2014-12-31 Thread Kapil Malik
Hi Naveen,

Quoting 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
SparkContext is Main entry point for Spark functionality. A SparkContext 
represents the connection to a Spark cluster, and can be used to create RDDs, 
accumulators and broadcast variables on that cluster.

Only one SparkContext may be active per JVM. You must stop() the active 
SparkContext before creating a new one

So stop ( ) shuts down the connection between Driver program and Spark master, 
and does some cleanup. Indeed, after calling this, you cannot do any operation 
on it or on any RDD created via this context.

Regards,

Kapil

From: Naveen Madhire [mailto:vmadh...@umail.iu.edu]
Sent: 31 December 2014 22:08
To: RK
Cc: user@spark.apache.org
Subject: Re: Fwd: Sample Spark Program Error

Yes. The exception is gone now after adding stop() at the end.
Can you please tell me what this stop() does at the end. Does it disable the 
spark context.

On Wed, Dec 31, 2014 at 10:09 AM, RK 
prk...@yahoo.commailto:prk...@yahoo.com wrote:
If you look at your program output closely, you can see the following output.
Lines with a: 24, Lines with b: 15

The exception seems to be happening with Spark cleanup after executing your 
code. Try adding sc.stop() at the end of your program to see if the exception 
goes away.



On Wednesday, December 31, 2014 6:40 AM, Naveen Madhire 
vmadh...@umail.iu.edumailto:vmadh...@umail.iu.edu wrote:


Hi All,

I am trying to run a sample Spark program using Scala SBT,

Below is the program,

def main(args: Array[String]) {

  val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should be 
some file on your system
  val sc = new SparkContext(local, Simple App, 
E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar))
  val logData = sc.textFile(logFile, 2).cache()

  val numAs = logData.filter(line = line.contains(a)).count()
  val numBs = logData.filter(line = line.contains(b)).count()

  println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))

}


Below is the error log,


14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: 
file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+673
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called with 
curMem=34047, maxMem=280248975
14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values in 
memory (estimated size 2032.0 B, free 267.2 MB)
14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on 
zealot:61452 (size: 2032.0 B, free: 267.3 MB)
14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_0
14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 
0). 2300 bytes result sent to driver
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 
(TID 1, localhost, PROCESS_LOCAL, 1264 bytes)
14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found, 
computing it
14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: 
file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+673
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 
(TID 0) in 3507 ms on localhost (1/2)
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called with 
curMem=36079, maxMem=280248975
14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values in 
memory (estimated size 1912.0 B, free 267.2 MB)
14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory on 
zealot:61452 (size: 1912.0 B, free: 267.3 MB)
14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_1
14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 
1). 2300 bytes result sent to driver
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 
(TID 1) in 261 ms on localhost (2/2)
14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose 
tasks have all completed, from pool
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at 
Test1.scala:19) finished in 3.811 s
14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at 
Test1.scala:19, took 3.997365232 s
14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at Test1.scala:20
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at 
Test1.scala:20) with 2 output partitions (allowLocal=false)
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at 
Test1.scala:20)
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List()
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1 
(FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called with 
curMem=37991, maxMem=280248975
14/12/30 

RE: Determination of number of RDDs

2014-12-04 Thread Kapil Malik
Regarding: Can we create such an array and then parallelize it?

Parallelizing an array of RDDs - i.e. RDD[RDD[x]] is not possible.
RDD is not serializable.

From: Deep Pradhan [mailto:pradhandeep1...@gmail.com]
Sent: 04 December 2014 15:39
To: user@spark.apache.org
Subject: Determination of number of RDDs

Hi,
I have a graph and I want to create RDDs equal in number to the nodes in the 
graph. How can I do that?
If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX?
Like in C language we have array of pointers. Do we have array of RDDs in Spark.
Can we create such an array and then parallelize it?

Thank You


RE: Snappy error with Spark SQL

2014-11-12 Thread Kapil Malik
Hi,

Try adding this in spark-env.sh

export 
JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/usr/lib/hadoop-0.20-mapreduce/lib/native/Linux-amd64-64
export 
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/hadoop-0.20-mapreduce/lib/native/Linux-amd64-64
export 
SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/usr/lib/hadoop-0.20-mapreduce/lib/native/Linux-amd64-64
export 
SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/lib/hadoop-0.20-mapreduce/lib/snappy-java-1.0.4.1.jar

Pointing to eqv. snappy / MR directory on your box.

Thanks,

Kapil Malik

From: Naveen Kumar Pokala [mailto:npok...@spcapitaliq.com]
Sent: 12 November 2014 19:59
To: user@spark.apache.org
Subject: Snappy error with Spark SQL

HI,

I am facing the following problem when I am trying to save my RDD as parquet 
File.


14/11/12 07:43:59 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 
(TID 48,): org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null
org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:236)
org.xerial.snappy.Snappy.clinit(Snappy.java:48)
parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:64)

org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)

org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)

parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:109)

parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:70)

parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
parquet.column.impl.ColumnWriterImpl.flush(ColumnWriterImpl.java:199)

parquet.column.impl.ColumnWriteStoreImpl.flush(ColumnWriteStoreImpl.java:108)

parquet.hadoop.InternalParquetRecordWriter.flushStore(InternalParquetRecordWriter.java:146)

parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:110)
parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)

org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:305)

org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)

org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
14/11/12 07:43:59 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 1.0 
(TID 51,): java.lang.NoClassDefFoundError: Could not initialize class 
org.xerial.snappy.Snappy
parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:64)

org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)

org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)

parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:109)

parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:70)

parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
parquet.column.impl.ColumnWriterImpl.flush(ColumnWriterImpl.java:199)

parquet.column.impl.ColumnWriteStoreImpl.flush(ColumnWriteStoreImpl.java:108)

parquet.hadoop.InternalParquetRecordWriter.flushStore(InternalParquetRecordWriter.java:146)

parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:110)
parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)

org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:305)

org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)

org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)


Please help me.

Regards,
Naveen.





RE: Help with processing multiple RDDs

2014-11-11 Thread Kapil Malik
Hi,

How is 78g distributed in driver, daemon, executor ?

Can you please paste the logs regarding  that I don't have enough memory to 
hold the data in memory
Are you collecting any data in driver ?

Lastly, did you try doing a re-partition to create smaller and evenly 
distributed partitions?

Regards,

Kapil 

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com] 
Sent: 12 November 2014 03:44
To: u...@spark.incubator.apache.org
Subject: Help with processing multiple RDDs

I have been struggling to process a set of RDDs.  Conceptually, it is is not a 
large data set. It seems, no matter how much I provide to JVM or partition, I 
can't seem to process this data.  I am caching the RDD.  I have tried 
persit(disk and memory), perist(memory) and persist(off_heap) with no success.  
Currently I am giving 78g to my driver, daemon and executor
memory.   

Currently, it seems to have trouble with one of the largest partition,
rdd_22_29 which is 25.9 GB.  

The metrics page shows Summary Metrics for 29 Completed Tasks.  However, I 
don't see few partitions on the list below.  However, i do seem to have 
warnings in the log file, indicating that I don't have enough memory to hold 
the data in memory.  I don't understand, what I am doing wrong or how I can 
troubleshoot. Any pointers will be appreciated...

14/11/11 21:28:45 WARN CacheManager: Not enough space to cache partition
rdd_22_20 in memory! Free memory is 17190150496 bytes.
14/11/11 21:29:27 WARN CacheManager: Not enough space to cache partition
rdd_22_13 in memory! Free memory is 17190150496 bytes.


Block Name  Storage Level   Size in Memory  Size on DiskExecutors
rdd_22_0Memory Deserialized 1x Replicated   2.1 MB  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_10   Memory Deserialized 1x Replicated   7.0 GB  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_11   Memory Deserialized 1x Replicated   1290.2 MB   0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_12   Memory Deserialized 1x Replicated   1167.7 KB   0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_14   Memory Deserialized 1x Replicated   3.8 GB  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_15   Memory Deserialized 1x Replicated   4.0 MB  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_16   Memory Deserialized 1x Replicated   2.4 GB  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_17   Memory Deserialized 1x Replicated   37.6 MB 0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_18   Memory Deserialized 1x Replicated   120.9 MB0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_19   Memory Deserialized 1x Replicated   755.9 KB0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_2Memory Deserialized 1x Replicated   289.5 KB0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_21   Memory Deserialized 1x Replicated   11.9 KB 0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_22   Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_23   Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_24   Memory Deserialized 1x Replicated   3.0 MB  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_25   Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_26   Memory Deserialized 1x Replicated   4.0 GB  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_27   Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_28   Memory Deserialized 1x Replicated   1846.1 KB   0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_29   Memory Deserialized 1x Replicated   25.9 GB 0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_3Memory Deserialized 1x Replicated   267.1 KB0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_4Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_5Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_6Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_7Memory Deserialized 1x Replicated   14.8 KB 0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_8Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974
rdd_22_9Memory Deserialized 1x Replicated   24.0 B  0.0 B
mddworker.c.fi-mdd-poc.internal:54974




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-processing-multiple-RDDs-tp18628.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 

RE: unsubscribe

2014-03-11 Thread Kapil Malik
Ohh !

I thought you're unsubscribing :)



Kapil Malik | kma...@adobe.com | 33430 / 8800836581



-Original Message-
From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: 12 March 2014 00:51
To: user@spark.apache.org
Subject: Re: unsubscribe



To unsubscribe from this list, please send a message to 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org and 
it will automatically unsubscribe you.



Matei







On Mar 11, 2014, at 12:15 PM, Abhishek Pratap 
apra...@sagebase.orgmailto:apra...@sagebase.org wrote: