Re: [Structured Streaming][Kafka] For a Kafka topic with 3 partitions, how does the parallelism work ?

2018-04-21 Thread Raghavendra Pandey
Yes as long as there are 3 cores available on your local machine.

On Fri, Apr 20, 2018 at 10:56 AM karthikjay  wrote:

> I have the following code to read data from Kafka topic using the
> structured
> streaming. The topic has 3 partitions:
>
>  val spark = SparkSession
>   .builder
>   .appName("TestPartition")
>   .master("local[*]")
>   .getOrCreate()
>
> import spark.implicits._
>
> val dataFrame = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers",
> "1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
>   .option("subscribe", "partition_test")
>   .option("failOnDataLoss", "false")
>   .load()
>   .selectExpr("CAST(value AS STRING)")
>
> My understanding is that Spark will launch 3 Kafka consumers (for 3
> partitions) and these 3 consumers will be running on the worker nodes. Is
> my
> understanding right ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Kafka version support

2017-11-29 Thread Raghavendra Pandey
Just wondering if anyone has tried spark structured streaming kafka
connector (2.2) with Kafka 0.11 or Kafka 1.0 version

Thanks
Raghav


Multiple queries on same stream

2017-08-08 Thread Raghavendra Pandey
I am using structured streaming to evaluate multiple rules on same running
stream.
I have two options to do that. One is to use forEach and evaluate all the
rules on the row..
The other option is to express rules in spark sql dsl and run multiple
queries.
I was wondering if option 1 will result in better performance even though I
can get catalyst optimization in option 2.

Thanks
Raghav


Re: Spark Streaming: Async action scheduling inside foreachRDD

2017-08-04 Thread Raghavendra Pandey
Did you try SparkContext.addSparkListener?



On Aug 3, 2017 1:54 AM, "Andrii Biletskyi"
 wrote:

> Hi all,
>
> What is the correct way to schedule multiple jobs inside foreachRDD method
> and importantly await on result to ensure those jobs have completed
> successfully?
> E.g.:
>
> kafkaDStream.foreachRDD{ rdd =>
> val rdd1 = rdd.map(...)
> val rdd2 = rdd1.map(...)
>
> val job1Future = Future{
> rdd1.saveToCassandra(...)
> }
>
> val job2Future = Future{
> rdd1.foreachPartition( iter => /* save to Kafka */)
> }
>
>   Await.result(
>   Future.sequence(job1Future, job2Future),
>   Duration.Inf)
>
>
>// commit Kafka offsets
> }
>
> In this code I'm scheduling two actions in futures and awaiting them. I
> need to be sure when I commit Kafka offsets at the end of the batch
> processing that job1 and job2 have actually executed successfully. Does
> given approach provide these guarantees? I.e. in case one of the jobs fails
> the entire batch will be marked as failed too?
>
>
> Thanks,
> Andrii
>


Re: How do I dynamically add nodes to spark standalone cluster and be able to discover them?

2017-01-25 Thread Raghavendra Pandey
When you start a slave you pass address of master as a parameter. That
slave will contact master and register itself.

On Jan 25, 2017 4:12 AM, "kant kodali"  wrote:

> Hi,
>
> How do I dynamically add nodes to spark standalone cluster and be able to
> discover them? Does Zookeeper do service discovery? What is the standard
> tool for these things?
>
> Thanks,
> kant
>


Re: Nested ifs in sparksql

2017-01-11 Thread Raghavendra Pandey
I am not using case when. It is mostly IF. By slow, I mean 6 min even for
10 records for 41 level nested ifs.

On Jan 11, 2017 3:31 PM, "Georg Heiler" <georg.kf.hei...@gmail.com> wrote:

> I was using the dataframe api not sql. The main problem was that too much
> code was generated.
> Using an unforgettable turned out to be quicker as well.
> Olivier Girardot <o.girar...@lateral-thoughts.com> schrieb am Di. 10.
> Jan. 2017 um 21:54:
>
>> Are you using the "case when" functions ? what do you mean by slow ? can
>> you share a snippet ?
>>
>>
>>
>> On Tue, Jan 10, 2017 8:15 PM, Georg Heiler georg.kf.hei...@gmail.com
>> wrote:
>>
>> Maybe you can create an UDF?
>>
>> Raghavendra Pandey <raghavendra.pan...@gmail.com> schrieb am Di., 10.
>> Jan. 2017 um 20:04 Uhr:
>>
>> I have of around 41 level of nested if else in spark sql. I have
>> programmed it using apis on dataframe. But it takes too much time.
>> Is there anything I can do to improve on time here?
>>
>>
>>
>> *Olivier Girardot* | Associé
>> o.girar...@lateral-thoughts.com
>> +33 6 24 09 17 94
>>
>


Nested ifs in sparksql

2017-01-10 Thread Raghavendra Pandey
I have of around 41 level of nested if else in spark sql. I have programmed
it using apis on dataframe. But it takes too much time.
Is there anything I can do to improve on time here?


Re: Access HDFS within Spark Map Operation

2016-09-13 Thread Raghavendra Pandey
How large is your first text file? The idea is you read first text file and
if it is not large you can collect all the lines on driver and then again
read text files for each line and union all rdds.

On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake"  wrote:

> Just wonder if this is possible with Spark?
>
> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake 
> wrote:
>
>> Hi,
>>
>> I've got a text file where each line is a record. For each record, I need
>> to process a file in HDFS.
>>
>> So if I represent these records as an RDD and invoke a map() operation on
>> them how can I access the HDFS within that map()? Do I have to create a
>> Spark context within map() or is there a better solution to that?
>>
>> Thank you,
>> Saliya
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


Re: Passing Custom App Id for consumption in History Server

2016-09-03 Thread Raghavendra Pandey
Default implementation is to add milliseconds. For mesos it is
framework-id. If you are using mesos, you can assume that your framework id
used to register your app is same as app-id.
As you said, you have a system application to schedule spark jobs, you can
keep track of framework-ids submitted by your application, you can use the
same info.

On Fri, Sep 2, 2016 at 6:29 PM, Amit Shanker 
wrote:

> Currently Spark sets current time in Milliseconds as the app Id. Is there
> a way one can pass in the app id to the spark job, so that it uses this
> provided app id instead of generating one using time?
>
> Lets take the following scenario : I have a system application which
> schedules spark jobs, and records the metadata for that job (say job
> params, cores, etc). In this system application, I want to link every job
> with its corresponding UI (history server). The only way I can do this is
> if I have the app Id of that job stored in this system application. And the
> only way one can get the app Id is by using the
> SparkContext.getApplicationId() function - which needs to be run from
> inside the job. So, this make it difficult to convey this piece of
> information from spark to a system outside spark.
>
> Thanks,
> Amit Shanker
>


Re: how to pass trustStore path into pyspark ?

2016-09-03 Thread Raghavendra Pandey
Did you try passing them in spark-env.sh?

On Sat, Sep 3, 2016 at 2:42 AM, Eric Ho  wrote:

> I'm trying to pass a trustStore pathname into pyspark.
> What env variable and/or config file or script I need to change to do this
> ?
> I've tried setting JAVA_OPTS env var but to no avail...
> any pointer much appreciated...  thx
>
> --
>
> -eric ho
>
>


Re: Importing large file with SparkContext.textFile

2016-09-03 Thread Raghavendra Pandey
If your file format is splittable say TSV, CSV etc, it will be distributed
across all executors.

On Sat, Sep 3, 2016 at 3:38 PM, Somasundaram Sekar <
somasundar.se...@tigeranalytics.com> wrote:

> Hi All,
>
>
>
> Would like to gain some understanding on the questions listed below,
>
>
>
> 1.   When processing a large file with Apache Spark, with, say,
> sc.textFile("somefile.xml"), does it split it for parallel processing
> across executors or, will it be processed as a single chunk in a single
> executor?
>
> 2.   When using dataframes, with implicit XMLContext from Databricks
> is there any optimization prebuilt for such large file processing?
>
>
>
> Please help!!!
>
>
>
> http://stackoverflow.com/questions/39305310/does-spark-
> process-large-file-in-the-single-worker
>
>
>
> Regards,
>
> Somasundaram S
>


Re: Catalog, SessionCatalog and ExternalCatalog in spark 2.0

2016-09-03 Thread Raghavendra Pandey
Kapil -- I afraid you need to plugin your own SessionCatalog as
ResolveRelations class depends on that. To keep up with consistent design
you may like to implement ExternalCatalog as well.
You can also look to plug in your own Analyzer class to give your more
flexibility. Ultimately that is where all Relations get resolved from
SessionCatalog.

On Sat, Sep 3, 2016 at 5:49 PM, Kapil Malik 
wrote:

> Hi all,
>
> I have a Spark SQL 1.6 application in production which does following on
> executing sqlContext.sql(...) -
> 1. Identify the table-name mentioned in query
> 2. Use an external database to decide where's the data located, in which
> format (parquet or csv or jdbc) etc.
> 3. Load the dataframe
> 4. Register it as temp table (for future calls to this table)
>
> This is achieved by extending HiveContext, and correspondingly
> HiveCatalog. I have my own implementation of trait "Catalog", which
> over-rides the "lookupRelation" method to do the magic behind the scenes.
>
> However, in spark 2.0, I can see following -
> SessionCatalog - which contains lookupRelation method, but doesn't have
> any interface / abstract class to it.
> ExternalCatalog - which deals with CatalogTable instead of Df /
> LogicalPlan.
> Catalog - which also doesn't expose any method to lookup Df / LogicalPlan.
>
> So apparently it looks like I need to extend SessionCatalog only.
> However, just wanted to get a feedback on if there's a better /
> recommended approach to achieve this.
>
>
> Thanks and regards,
>
>
> Kapil Malik
> *Sr. Principal Engineer | Data Platform, Technology*
> M: +91 8800836581 | T: 0124-433 | EXT: 20910
> ASF Centre A | 1st Floor | Udyog Vihar Phase IV |
> Gurgaon | Haryana | India
>
> *Disclaimer:* This communication is for the sole use of the addressee and
> is confidential and privileged information. If you are not the intended
> recipient of this communication, you are prohibited from disclosing it and
> are required to delete it forthwith. Please note that the contents of this
> communication do not necessarily represent the views of Jasper Infotech
> Private Limited ("Company"). E-mail transmission cannot be guaranteed to be
> secure or error-free as information could be intercepted, corrupted, lost,
> destroyed, arrive late or incomplete, or contain viruses. The Company,
> therefore, does not accept liability for any loss caused due to this
> communication. *Jasper Infotech Private Limited, Registered Office: 1st
> Floor, Plot 238, Okhla Industrial Estate, New Delhi - 110020 INDIA CIN:
> U72300DL2007PTC168097*
>
>


Re: My notes on Spark Performance & Tuning Guide

2016-05-17 Thread Raghavendra Pandey
Can you please send me as well.

Thanks
Raghav
On 12 May 2016 20:02, "Tom Ellis"  wrote:

> I would like to also Mich, please send it through, thanks!
>
> On Thu, 12 May 2016 at 15:14 Alonso Isidoro  wrote:
>
>> Me too, send me the guide.
>>
>> Enviado desde mi iPhone
>>
>> El 12 may 2016, a las 12:11, Ashok Kumar > > escribió:
>>
>> Hi Dr Mich,
>>
>> I will be very keen to have a look at it and review if possible.
>>
>> Please forward me a copy
>>
>> Thanking you warmly
>>
>>
>> On Thursday, 12 May 2016, 11:08, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>
>> Hi Al,,
>>
>>
>> Following the threads in spark forum, I decided to write up on
>> configuration of Spark including allocation of resources and configuration
>> of driver, executors, threads, execution of Spark apps and general
>> troubleshooting taking into account the allocation of resources for Spark
>> applications and OS tools at the disposal.
>>
>> Since the most widespread configuration as I notice is with "Spark
>> Standalone Mode", I have decided to write these notes starting with
>> Standalone and later on moving to Yarn
>>
>>
>>- *Standalone *– a simple cluster manager included with Spark that
>>makes it easy to set up a cluster.
>>- *YARN* – the resource manager in Hadoop 2.
>>
>>
>> I would appreciate if anyone interested in reading and commenting to get
>> in touch with me directly on mich.talebza...@gmail.com so I can send the
>> write-up for their review and comments.
>>
>> Just to be clear this is not meant to be any commercial proposition or
>> anything like that. As I seem to get involved with members troubleshooting
>> issues and threads on this topic, I thought it is worthwhile writing a note
>> about it to summarise the findings for the benefit of the community.
>>
>> Regards.
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>


Re: Executor memory requirement for reduceByKey

2016-05-17 Thread Raghavendra Pandey
Even though it does not sound intuitive,  reduce by key expects all values
for a particular key for a partition to be loaded into memory. So once you
increase the partitions you can run the jobs.


Re: Not able pass 3rd party jars to mesos executors

2016-05-11 Thread Raghavendra Pandey
You have kept 3rd party jars at hdfs. I don't think executors as of today
can download jars from hdfs..  Can you try with a shared directory..
Application jar is downloaded by executors through http server..

-Raghav
On 12 May 2016 00:04, "Giri P" <gpatc...@gmail.com> wrote:

> Yes..They are  reachable. Application jar which I send as argument is at
> same location as third party jar. Application jar is getting uploaded.
>
> On Wed, May 11, 2016 at 10:51 AM, lalit sharma <lalitkishor...@gmail.com>
> wrote:
>
>> Point to note as per docs as well :
>>
>> *Note that jars or python files that are passed to spark-submit should be
>> URIs reachable by Mesos slaves, as the Spark driver doesn’t automatically
>> upload local jars.**http://spark.apache.org/docs/latest/running-on-mesos.html
>> <http://spark.apache.org/docs/latest/running-on-mesos.html> *
>>
>> On Wed, May 11, 2016 at 10:05 PM, Giri P <gpatc...@gmail.com> wrote:
>>
>>> I'm not using docker
>>>
>>> On Wed, May 11, 2016 at 8:47 AM, Raghavendra Pandey <
>>> raghavendra.pan...@gmail.com> wrote:
>>>
>>>> By any chance, are you using docker to execute?
>>>> On 11 May 2016 21:16, "Raghavendra Pandey" <
>>>> raghavendra.pan...@gmail.com> wrote:
>>>>
>>>>> On 11 May 2016 02:13, "gpatcham" <gpatc...@gmail.com> wrote:
>>>>>
>>>>> >
>>>>>
>>>>> > Hi All,
>>>>> >
>>>>> > I'm using --jars option in spark-submit to send 3rd party jars . But
>>>>> I don't
>>>>> > see they are actually passed to mesos slaves. Getting Noclass found
>>>>> > exceptions.
>>>>> >
>>>>> > This is how I'm using --jars option
>>>>> >
>>>>> > --jars hdfs://namenode:8082/user/path/to/jar
>>>>> >
>>>>> > Am I missing something here or what's the correct  way to do ?
>>>>> >
>>>>> > Thanks
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-pass-3rd-party-jars-to-mesos-executors-tp26918.html
>>>>> > Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>> >
>>>>> > -
>>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>>>> >
>>>>>
>>>>
>>>
>>
>


Re: Not able pass 3rd party jars to mesos executors

2016-05-11 Thread Raghavendra Pandey
By any chance, are you using docker to execute?
On 11 May 2016 21:16, "Raghavendra Pandey" <raghavendra.pan...@gmail.com>
wrote:

> On 11 May 2016 02:13, "gpatcham" <gpatc...@gmail.com> wrote:
>
> >
>
> > Hi All,
> >
> > I'm using --jars option in spark-submit to send 3rd party jars . But I
> don't
> > see they are actually passed to mesos slaves. Getting Noclass found
> > exceptions.
> >
> > This is how I'm using --jars option
> >
> > --jars hdfs://namenode:8082/user/path/to/jar
> >
> > Am I missing something here or what's the correct  way to do ?
> >
> > Thanks
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-pass-3rd-party-jars-to-mesos-executors-tp26918.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Not able pass 3rd party jars to mesos executors

2016-05-11 Thread Raghavendra Pandey
On 11 May 2016 02:13, "gpatcham"  wrote:

>

> Hi All,
>
> I'm using --jars option in spark-submit to send 3rd party jars . But I
don't
> see they are actually passed to mesos slaves. Getting Noclass found
> exceptions.
>
> This is how I'm using --jars option
>
> --jars hdfs://namenode:8082/user/path/to/jar
>
> Am I missing something here or what's the correct  way to do ?
>
> Thanks
>
>
>
> --
> View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-pass-3rd-party-jars-to-mesos-executors-tp26918.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Spark 1.6.0: substring on df.select

2016-05-11 Thread Raghavendra Pandey
You can create a column with count of /.  Then take max of it and create
that many columns for every row with null fillers.

Raghav
On 11 May 2016 20:37, "Bharathi Raja"  wrote:

Hi,



I have a dataframe column col1 with values something like
“/client/service/version/method”. The number of “/” are not constant.

Could you please help me to extract all methods from the column col1?



In Pig i used SUBSTRING with LAST_INDEX_OF(“/”).



Thanks in advance.

Regards,

Raja


Re: [Spakr1.4.1] StuctField for date column in CSV file while creating custom schema

2015-12-29 Thread Raghavendra Pandey
U can use date type...
On Dec 29, 2015 9:02 AM, "Divya Gehlot"  wrote:

> Hi,
> I am newbee to Spark ,
> My appologies for such a naive question
> I am using Spark 1.4.1 and wrtiting code in scala . I have input data as
> CSVfile  which I am parsing using spark-csv package . I am creating custom
> schema to process the CSV file .
> Now my query is which dataype or can say  Structfield should I use for
> Date column of my CSV file.
> I am using hivecontext and have requirement to create hive table after
> processing the CSV file.
> For example my date columnin CSV file  looks like
>
> 25/11/2014 20/9/2015 25/10/2015 31/10/2012 25/9/2013 25/11/2012 20/10/2013
> 25/10/2011
>
>


Re: Save to paquet files failed

2015-10-22 Thread Raghavendra Pandey
Can ypu increase number of partitions and try... Also, i dont think you
need to cache dfs before saving them... U can do away with that as well...

Raghav
On Oct 23, 2015 7:45 AM, "Ram VISWANADHA" 
wrote:

> Hi ,
> I am trying to load 931MB file into an RDD, then create a DataFrame and
> store the data in a Parquet file. The save method of Parquet file is
> hanging. I have set the timeout to 1800 but still the system fails to
> respond and hangs. I can’t spot any errors in my code. Can someone help me?
> Thanks in advance.
>
> Environment
>
>1. OS X 10.10.5 with 8G RAM
>2. JDK 1.8.0_60
>
> Code
>
> final SQLContext sqlContext = new SQLContext(jsc);
> //convert user viewing history to ratings (hash user_id to int)
> JavaRDD ratingJavaRDD = createMappedRatingsRDD(jsc);
> //for testing with 2d_full.txt data
> //JavaRDD ratingJavaRDD = createMappedRatingRDDFromFile(jsc);
> JavaRDD ratingRowsRDD = ratingJavaRDD.map(new GenericRowFromRating());
> ratingRowsRDD.cache();
>
> //This line saves the files correctly
>
> ratingJavaRDD.saveAsTextFile("file:///Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd");
>
> final DataFrame ratingDF = sqlContext.createDataFrame(ratingRowsRDD,
> getStructTypeForRating());
> ratingDF.registerTempTable("rating_db");
> ratingDF.show();
> ratingDF.cache();
>
> //this line hangs
>
> ratingDF.write().format("parquet").save(
> "file:///Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet"
> );
>
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-*
>
> -rw-r--r--  1 r.viswanadha  staff   785K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-0
>
> -rw-r--r--  1 r.viswanadha  staff   790K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-1
>
> -rw-r--r--  1 r.viswanadha  staff   786K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-2
>
> -rw-r--r--  1 r.viswanadha  staff   796K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-3
>
> -rw-r--r--  1 r.viswanadha  staff   791K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-4
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet/_temporary/0/
>
> The only thing that is saved is the temporary part file
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet/_temporary/0/task_201510221857_0007_m_00/
>
> total 336
>
> drwxr-xr-x  4 r.viswanadha  staff   136B Oct 22 18:57 .
>
> drwxr-xr-x  4 r.viswanadha  staff   136B Oct 22 18:57 ..
>
> -rw-r--r--  1 r.viswanadha  staff   1.3K Oct 22 18:57
> .part-r-0-65562f67-357c-4645-8075-13b733a71ee5.gz.parquet.crc
>
> -rw-r--r--  1 r.viswanadha  staff   163K Oct 22 18:57
> part-r-0-65562f67-357c-4645-8075-13b733a71ee5.gz.parquet
>
>
> Active Stages (1) Stage Id Description Submitted Duration Tasks:
> Succeeded/Total Input Output Shuffle Read Shuffle Write 7 (kill)
> save at
> Recommender.java:549 
> +details
> 
>
> 2015/10/22 18:57:15 17 min
> 1/5
> 9.4 MB
> Best Regards,
> Ram
>
>


Re: REST api to avoid spark context creation

2015-10-18 Thread Raghavendra Pandey
You may like to look at spark job server.
https://github.com/spark-jobserver/spark-jobserver

Raghavendra


Re: repartition vs partitionby

2015-10-17 Thread Raghavendra Pandey
You can use coalesce function, if you want to reduce the number of
partitions. This one minimizes the data shuffle.

-Raghav

On Sat, Oct 17, 2015 at 1:02 PM, shahid qadri 
wrote:

> Hi folks
>
> I need to reparation large set of data around(300G) as i see some portions
> have large data(data skew)
>
> i have pairRDDs [({},{}),({},{}),({},{})]
>
> what is the best way to solve the the problem
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: s3a file system and spark deployment mode

2015-10-17 Thread Raghavendra Pandey
You can add classpath info in hadoop env file...

Add the following line to your $HADOOP_HOME/etc/hadoop/hadoop-env.sh
export
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/tools/lib/*

Add the following line to $SPARK_HOME/conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop --config
$HADOOP_HOME/etc/hadoop classpath)


This is how you set up hadoop 2.7.1 and spark 1.5.1 with no hadoop. This
will also put necessary jars to your classpath to access s3a.

Also, please note that you need to set fs.s3a.access.key
and fs.s3a.secret.key property into your core-site.xml, rather
than fs.s3a.awsSecretAccessKey and fs.s3a.awsAccessKeyId as mentioned in
the docs.

Good luck
-Raghav

On Fri, Oct 16, 2015 at 9:07 PM, Scott Reynolds 
wrote:

> hmm I tried using --jars and that got passed to MasterArguments and that
> doesn't work :-(
>
>
> https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
>
> Same with Worker:
> https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
>
> Both Master and Worker have to start with these two jars because
> a.) the Master has to serve the event log in s3
> b.) the Worker runs the Driver and has to download the jar from s3
>
> And yes I am using these deps:
>
> 
> 
> org.apache.hadoop
> hadoop-aws
> 2.7.1
> 
>
> 
> com.amazonaws
> aws-java-sdk
> 1.7.4
> 
>
> I think I have settled on just modifying the java command line that starts
> up the worker and master. Just seems easier. Currently launching them with
> spark-class bash script
>
> /mnt/services/spark/bin/spark-class org.apache.spark.deploy.master.Master \
> --ip `hostname -i` --port 7077 --webui-port 8080
>
> If all else fails I will update the spark pom and and include it in the
> shaded spark jar.
>
> On Fri, Oct 16, 2015 at 2:25 AM, Steve Loughran 
> wrote:
>
>>
>> > On 15 Oct 2015, at 19:04, Scott Reynolds  wrote:
>> >
>> > List,
>> >
>> > Right now we build our spark jobs with the s3a hadoop client. We do
>> this because our machines are only allowed to use IAM access to the s3
>> store. We can build our jars with the s3a filesystem and the aws sdk just
>> fine and this jars run great in *client mode*.
>> >
>> > We would like to move from client mode to cluster mode as that will
>> allow us to be more resilient to driver failure. In order to do this either:
>> > 1. the jar file has to be on worker's local disk
>> > 2. the jar file is in shared storage (s3a)
>> >
>> > We would like to put the jar file in s3 storage, but when we give the
>> jar path as s3a://.., the worker node doesn't have the hadoop s3a and
>> aws sdk in its classpath / uber jar.
>> >
>> > Other then building spark with those two dependencies, what other
>> options do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a
>> thing.
>> >
>> > Need to get s3a access to both the master (so that we can log spark
>> event log to s3) and to the worker processes (driver, executor).
>> >
>> > Looking for ideas before just adding the dependencies to our spark
>> build and calling it a day.
>>
>>
>> you can use --jars to add these, e.g
>>
>> -jars hadoop-aws.jar,aws-java-sdk-s3
>>
>>
>> as others have warned, you need Hadoop 2.7.1 for s3a to work proplery
>>
>
>


Re: Complex transformation on a dataframe column

2015-10-17 Thread Raghavendra Pandey
Here is a quick code sample I can come up with :

case class Input(ID:String, Name:String, PhoneNumber:String, Address:
String)
val df = sc.parallelize(Seq(Input("1", "raghav", "0123456789",
"houseNo:StreetNo:City:State:Zip"))).toDF()
val formatAddress = udf { (s: String) => s.split(":").mkString("-")}
val outputDF = df.withColumn("FormattedAddress",
formatAddress(df("Address")))


-Raghav

On Thu, Oct 15, 2015 at 10:34 PM, Hao Wang  wrote:

> Hi,
>
> I have searched around but could not find a satisfying answer to this
> question: what is the best way to do a complex transformation on a
> dataframe column?
>
> For example, I have a dataframe with the following schema and a function
> that has pretty complex logic to format addresses. I would like to use the
> function to format each address and store the output as an additional
> column in the dataframe. What is the best way to do it? Use Dataframe.map?
> Define a UDF? Some code example would be appreciated.
>
> Input dataframe:
> root
>  |-- ID: string (nullable = true)
>  |-- Name: string (nullable = true)
>  |-- PhoneNumber: string (nullable = true)
>  |-- Address: string (nullable = true)
>
> Output dataframe:
> root
>  |-- ID: string (nullable = true)
>  |-- Name: string (nullable = true)
>  |-- PhoneNumber: string (nullable = true)
>  |-- Address: string (nullable = true)
>  |-- FormattedAddress: string (nullable = true)
>
> The function for format addresses:
> def formatAddress(address: String): String
>
>
> Best regards,
> Hao Wang
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: s3a file system and spark deployment mode

2015-10-15 Thread Raghavendra Pandey
You can use spark 1.5.1 with no hadoop and hadoop 2.7.1..
Hadoop 2.7.1 is more mature for s3a access. You also need to set hadoop
tools dir into hadoop classpath...

Raghav
On Oct 16, 2015 1:09 AM, "Scott Reynolds"  wrote:

> We do not use EMR. This is deployed on Amazon VMs
>
> We build Spark with Hadoop-2.6.0 but that does not include the s3a
> filesystem nor the Amazon AWS SDK
>
> On Thu, Oct 15, 2015 at 12:26 PM, Spark Newbie 
> wrote:
>
>> Are you using EMR?
>> You can install Hadoop-2.6.0 along with Spark-1.5.1 in your EMR cluster.
>> And that brings s3a jars to the worker nodes and it becomes available to
>> your application.
>>
>> On Thu, Oct 15, 2015 at 11:04 AM, Scott Reynolds 
>> wrote:
>>
>>> List,
>>>
>>> Right now we build our spark jobs with the s3a hadoop client. We do this
>>> because our machines are only allowed to use IAM access to the s3 store. We
>>> can build our jars with the s3a filesystem and the aws sdk just fine and
>>> this jars run great in *client mode*.
>>>
>>> We would like to move from client mode to cluster mode as that will
>>> allow us to be more resilient to driver failure. In order to do this either:
>>> 1. the jar file has to be on worker's local disk
>>> 2. the jar file is in shared storage (s3a)
>>>
>>> We would like to put the jar file in s3 storage, but when we give the
>>> jar path as s3a://.., the worker node doesn't have the hadoop s3a and
>>> aws sdk in its classpath / uber jar.
>>>
>>> Other then building spark with those two dependencies, what other
>>> options do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a
>>> thing.
>>>
>>> Need to get s3a access to both the master (so that we can log spark
>>> event log to s3) and to the worker processes (driver, executor).
>>>
>>> Looking for ideas before just adding the dependencies to our spark build
>>> and calling it a day.
>>>
>>
>>
>


Re: Problem installing Sparck on Windows 8

2015-10-14 Thread Raghavendra Pandey
Looks like you are facing ipv6 issue. Can you try using preferIPv4 property
on.
On Oct 15, 2015 2:10 AM, "Steve Loughran"  wrote:

>
> On 14 Oct 2015, at 20:56, Marco Mistroni  wrote:
>
>
> 15/10/14 20:52:35 WARN : Your hostname, MarcoLaptop resolves to a
> loopback/non-r
> eachable address: fe80:0:0:0:c5ed:a66d:9d95:5caa%wlan2, but we couldn't
> find any
>  external IP address!
> java.lang.RuntimeException: java.lang.RuntimeException: The root scratch
> dir: /t
> mp/hive on HDFS should be writable. Current permissions are: -
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav
> a:522)
> at
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.s
> cala:171)
> at
> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveCo
>
>
> now, that I haven't seen. Looks like it thinks the permissions are wrong,
> doesn't it?
>


Re: Spark Master Dying saying TimeoutException

2015-10-14 Thread Raghavendra Pandey
I fixed these timeout errors by retrying...
On Oct 15, 2015 3:41 AM, "Kartik Mathur"  wrote:

> Hi,
>
> I have some nightly jobs which runs every night but dies sometimes because
> of unresponsive master , spark master logs says -
>
> Not seeing much else there , what could possible cause an exception like
> this.
>
> *Exception in thread "main" java.util.concurrent.TimeoutException: Futures
> timed out after [1 milliseconds]*
>
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
> at scala.concurrent.Await$.result(package.scala:107)
>
> at akka.remote.Remoting.start(Remoting.scala:180)
>
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>
> 2015-10-14 05:43:04 ERROR Remoting:65 - Remoting error: [Startup timed
> out] [
>
> akka.remote.RemoteTransportException: Startup timed out
>
> at
> akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)
>
> at akka.remote.Remoting.start(Remoting.scala:198)
>
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
>
> at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
>
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
>
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
>
> at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)
>
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>
> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
>
> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
>
> at
> org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:906)
>
> at org.apache.spark.deploy.master.Master$.main(Master.scala:869)
>
> at org.apache.spark.deploy.master.Master.main(Master.scala)
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [1 milliseconds]
>
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
> at scala.concurrent.Await$.result(package.scala:107)
>
> at akka.remote.Remoting.start(Remoting.scala:180)
>
> ... 17 more
>
>
>


Re: How to compile Spark with customized Hadoop?

2015-10-10 Thread Raghavendra Pandey
There is spark without hadoop version.. You can use that to link with any
custom hadoop version.

Raghav
On Oct 10, 2015 5:34 PM, "Steve Loughran"  wrote:

>
> During development, I'd recommend giving Hadoop a version ending with
> -SNAPSHOT, and building spark with maven, as mvn knows to refresh the
> snapshot every day.
>
> you can do this in hadoop with
>
> mvn versions:set 2.7.0.stevel-SNAPSHOT
>
> if you are working on hadoop branch-2 or trunk direct, they come with
> -SNAPSHOT anyway, but unless you build hadoop every morning, you may find
> maven pulls in the latest nightly builds from the apache snapshot
> repository, which will cause chaos and confusion. This is also why you must
> never have maven build which spans midnight in your time zone.
>
>
> On 9 Oct 2015, at 22:31, Matei Zaharia  wrote:
>
> You can publish your version of Hadoop to your Maven cache with mvn
> publish (just give it a different version number, e.g. 2.7.0a) and then
> pass that as the Hadoop version to Spark's build (see
> http://spark.apache.org/docs/latest/building-spark.html).
>
> Matei
>
> On Oct 9, 2015, at 3:10 PM, Dogtail L  wrote:
>
> Hi all,
>
> I have modified Hadoop source code, and I want to compile Spark with my
> modified Hadoop. Do you know how to do that? Great thanks!
>
>
>
>


Re: Spark based Kafka Producer

2015-09-11 Thread Raghavendra Pandey
You can pass the number of executors via command line option
--num-executors.You need more than 2 executors to make spark-streaming
working.

For more details on command line option, please go through
http://spark.apache.org/docs/latest/running-on-yarn.html.


On Fri, Sep 11, 2015 at 10:52 AM, Atul Kulkarni <atulskulka...@gmail.com>
wrote:

> I am submitting the job with yarn-cluster mode.
>
> spark-submit --master yarn-cluster ...
>
> On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> What is the value of spark master conf.. By default it is local, that
>> means only one thread can run and that is why your job is stuck.
>> Specify it local[*], to make thread pool equal to number of cores...
>>
>> Raghav
>> On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com> wrote:
>>
>>> Hi Folks,
>>>
>>> Below is the code  have for Spark based Kafka Producer to take advantage
>>> of multiple executors reading files in parallel on my cluster but I am
>>> stuck at The program not making any progress.
>>>
>>> Below is my scrubbed code:
>>>
>>> val sparkConf = new SparkConf().setAppName(applicationName)
>>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>>>
>>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>>>
>>> val zipFileDStreams = ssc.textFileStream(inputFiles)
>>> zipFileDStreams.foreachRDD {
>>>   rdd =>
>>> rdd.foreachPartition(
>>>   partition => {
>>> partition.foreach{
>>>   case (logLineText) =>
>>> println(logLineText)
>>> producerObj.value.send(topics, logLineText)
>>> }
>>>   }
>>> )
>>> }
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> ssc.stop()
>>>
>>> The code for KafkaSink is as follows.
>>>
>>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
>>> Array[Byte]]) extends Serializable {
>>>
>>>   lazy val producer = createProducer()
>>>   val logParser = new LogParser()
>>>
>>>   def send(topic: String, value: String): Unit = {
>>>
>>> val logLineBytes = 
>>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
>>> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
>>> logLineBytes))
>>>   }
>>> }
>>>
>>> object KafkaSink {
>>>   def apply(config: Properties): KafkaSink = {
>>>
>>> val f = () => {
>>>   val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
>>> null, null)
>>>
>>>   sys.addShutdownHook {
>>> producer.close()
>>>   }
>>>   producer
>>> }
>>>
>>> new KafkaSink(f)
>>>   }
>>> }
>>>
>>> Disclaimer: it is based on the code inspired by
>>> http://allegro.tech/spark-kafka-integration.html.
>>>
>>> The job just sits there I cannot see any Job Stages being created.
>>> Something I want to mention - I I am trying to read gzipped files from HDFS
>>> - could it be that Streaming context is not able to read *.gz files?
>>>
>>>
>>> I am not sure what more details I can provide to help explain my problem.
>>>
>>>
>>> --
>>> Regards,
>>> Atul Kulkarni
>>>
>>
>
>
> --
> Regards,
> Atul Kulkarni
>


Re: about mr-style merge sort

2015-09-10 Thread Raghavendra Pandey
In mr jobs, the output is sorted only within reducer.. That can be better
emulated by sorting each partition of rdd rather than total sorting the
rdd..
In Rdd.mapPartition you can sort the data in one partition and try...
On Sep 11, 2015 7:36 AM, "周千昊"  wrote:

> Hi, all
>  Can anyone give some tips about this issue?
>
> 周千昊 于2015年9月8日周二 下午4:46写道:
>
>> Hi, community
>>  I have an application which I try to migrate from MR to Spark.
>>  It will do some calculations from Hive and output to hfile which
>> will be bulk load to HBase Table, details as follow:
>>
>>  Rdd input = getSourceInputFromHive()
>>  Rdd> mapSideResult =
>> input.glom().mapPartitions(/*some calculation*/)
>>  // PS: the result in each partition has already been sorted
>> according to the lexicographical order during the calculation
>>  mapSideResult.reduceByKey(/*some
>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2 to
>> Tuple2*/).saveAsNewAPIHadoopFile(/*write
>> to hfile*/)
>>
>>   *Here is the problem, as in MR, in the reducer side, the mapper
>> output has already been sorted, so that it is a merge sort which makes
>> writing to hfile is sequential and fast.*
>> *  However in Spark, the output of reduceByKey phase has been
>> shuffled, so I have to sort the rdd in order to write hfile which makes it
>> slower 2x running on Spark than on MR.*
>> *  I am wondering that, if there is anything I can leverage has the
>> same effect as MR. I happen to see a JIRA
>> ticket https://issues.apache.org/jira/browse/SPARK-2926
>> . Is it related to what I
>> am looking for?*
>>
> --
> Best Regard
> ZhouQianhao
>


Re: Spark based Kafka Producer

2015-09-10 Thread Raghavendra Pandey
What is the value of spark master conf.. By default it is local, that means
only one thread can run and that is why your job is stuck.
Specify it local[*], to make thread pool equal to number of cores...

Raghav
On Sep 11, 2015 6:06 AM, "Atul Kulkarni"  wrote:

> Hi Folks,
>
> Below is the code  have for Spark based Kafka Producer to take advantage
> of multiple executors reading files in parallel on my cluster but I am
> stuck at The program not making any progress.
>
> Below is my scrubbed code:
>
> val sparkConf = new SparkConf().setAppName(applicationName)
> val ssc = new StreamingContext(sparkConf, Seconds(2))
>
> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>
> val zipFileDStreams = ssc.textFileStream(inputFiles)
> zipFileDStreams.foreachRDD {
>   rdd =>
> rdd.foreachPartition(
>   partition => {
> partition.foreach{
>   case (logLineText) =>
> println(logLineText)
> producerObj.value.send(topics, logLineText)
> }
>   }
> )
> }
>
> ssc.start()
> ssc.awaitTermination()
>
> ssc.stop()
>
> The code for KafkaSink is as follows.
>
> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
> Array[Byte]]) extends Serializable {
>
>   lazy val producer = createProducer()
>   val logParser = new LogParser()
>
>   def send(topic: String, value: String): Unit = {
>
> val logLineBytes = 
> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
> logLineBytes))
>   }
> }
>
> object KafkaSink {
>   def apply(config: Properties): KafkaSink = {
>
> val f = () => {
>   val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
> null, null)
>
>   sys.addShutdownHook {
> producer.close()
>   }
>   producer
> }
>
> new KafkaSink(f)
>   }
> }
>
> Disclaimer: it is based on the code inspired by
> http://allegro.tech/spark-kafka-integration.html.
>
> The job just sits there I cannot see any Job Stages being created.
> Something I want to mention - I I am trying to read gzipped files from HDFS
> - could it be that Streaming context is not able to read *.gz files?
>
>
> I am not sure what more details I can provide to help explain my problem.
>
>
> --
> Regards,
> Atul Kulkarni
>


Re: Parquet partitioning for unique identifier

2015-09-02 Thread Raghavendra Pandey
Did you specify partitioning column while saving data..
On Sep 3, 2015 5:41 AM, "Kohki Nishio"  wrote:

> Hello experts,
>
> I have a huge json file (> 40G) and trying to use Parquet as a file
> format. Each entry has a unique identifier but other than that, it doesn't
> have 'well balanced value' column to partition it. Right now it just throws
> OOM and couldn't figure out what to do with it.
>
> It would be ideal if I could provide a partitioner based on the unique
> identifier value like computing its hash value or something.  One of the
> option would be to produce a hash value and add it as a separate column,
> but it doesn't sound right to me. Is there any other ways I can try ?
>
> Regards,
> --
> Kohki Nishio
>


Re: Unbale to run Group BY on Large File

2015-09-02 Thread Raghavendra Pandey
You can increase number of partitions n try...
On Sep 3, 2015 5:33 AM, "Silvio Fiorito" 
wrote:

> Unfortunately, groupBy is not the most efficient operation. What is it
> you’re trying to do? It may be possible with one of the other *byKey
> transformations.
>
> From: "SAHA, DEBOBROTA"
> Date: Wednesday, September 2, 2015 at 7:46 PM
> To: "'user@spark.apache.org'"
> Subject: Unbale to run Group BY on Large File
>
> Hi ,
>
>
>
> I am getting below error while I am trying to select data using SPARK SQL
> from a RDD table.
>
>
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> "Spark Context Cleaner" java.lang.InterruptedException
>
>
>
>
>
> The file or table size is around 113 GB and I am running SPARK 1.4 on a
> standalone cluster. Tried to extend the heap size but extending to 64GB
> also didn’t help.
>
>
>
> I would really appreciate any help on this.
>
>
>
> Thanks,
>
> Debobrota
>


Re: FlatMap Explanation

2015-09-02 Thread Raghavendra Pandey
Flatmap is just like map but it flattens out the seq output of the
closure...
In your case, you call "to" function that is to return list...

a.to(b) returns list(a,...,b)

So rdd.flatMap( x => x.to(3)) will take all element and return range upto
3..
On Sep 3, 2015 7:36 AM, "Ashish Soni"  wrote:

> Hi ,
>
> Can some one please explain the output of the flat map
> data in RDD as below
> {1, 2, 3, 3}
>
> rdd.flatMap(x => x.to(3))
>
> output as below
>
> {1, 2, 3, 2, 3, 3, 3}
> i am not able to understand how the output came as above.
>
> Thanks,
>
>


Re: Spark Version upgrade isue:Exception in thread main java.lang.NoSuchMethodError

2015-08-30 Thread Raghavendra Pandey
Looks like ur version n spark's Jackson package are at different versions.

Raghav
On Aug 28, 2015 4:01 PM, Manohar753 manohar.re...@happiestminds.com
wrote:

 Hi Team,
 I upgraded spark older versions to 1.4.1 after maven build i tried to ran
 my
 simple application but it failed and giving the below stacktrace.

 Exception in thread main java.lang.NoSuchMethodError:

 com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.addField(Lcom/fasterxml/jackson/databind/introspect/AnnotatedField;Lcom/fasterxml/jackson/databind/PropertyName;ZZZ)V
 at
 com.fasterxml.jackson.module.scala.introspect.ScalaPropertiesCollector.com
 $fasterxml$jackson$module$scala$introspect$ScalaPropertiesCollector$$_addField(ScalaPropertiesCollector.scala:109)

 i checked all the forex jackson versions but no luck



 Any help on this if some body already faced this issue.

 Thanks  in adcance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Version-upgrade-isue-Exception-in-thread-main-java-lang-NoSuchMethodError-tp24488.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Array Out OF Bound Exception

2015-08-29 Thread Raghavendra Pandey
So either you empty line at the end or when you use string.split you dont
specify -1 as second parameter...
On Aug 29, 2015 1:18 PM, Akhil Das ak...@sigmoidanalytics.com wrote:

 I suspect in the last scenario you are having an empty new line at the
 last line. If you put a try..catch you'd definitely know.

 Thanks
 Best Regards

 On Tue, Aug 25, 2015 at 2:53 AM, Michael Armbrust mich...@databricks.com
 wrote:

 This top line here is indicating that the exception is being throw from
 your code (i.e. code written in the console).

 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40)


 Check to make sure that you are properly handling data that has less
 columns than you would expect.



 On Mon, Aug 24, 2015 at 12:41 PM, SAHA, DEBOBROTA ds3...@att.com wrote:

 Hi ,



 I am using SPARK 1.4 and I am getting an array out of bound Exception
 when I am trying to read from a registered table in SPARK.



 For example If I have 3 different text files with the content as below:



 *Scenario 1*:

 A1|B1|C1

 A2|B2|C2



 *Scenario 2*:

 A1| |C1

 A2| |C2



 *Scenario 3*:

 A1| B1|

 A2| B2|



 So for Scenario 1 and 2 it’s working fine but for Scenario 3 I am
 getting the following error:



 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 3.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 2

 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40)

 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:38)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)

 at scala.collection.AbstractIterator.to(Iterator.scala:1157)

 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

 at
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)

 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

 at org.apache.spark.scheduler.Task.run(Task.scala:70)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)



 Driver stacktrace:

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)

 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)

 at
 

Re: Alternative to Large Broadcast Variables

2015-08-29 Thread Raghavendra Pandey
We are using Cassandra for similar kind of problem and it works well... You
need to take care of race condition between updating the store and looking
up the store...
On Aug 29, 2015 1:31 AM, Ted Yu yuzhih...@gmail.com wrote:

 +1 on Jason's suggestion.

 bq. this large variable is broadcast many times during the lifetime

 Please consider making this large variable more granular. Meaning, reduce
 the amount of data transferred between the key value store and your app
 during update.

 Cheers

 On Fri, Aug 28, 2015 at 12:44 PM, Jason ja...@jasonknight.us wrote:

 You could try using an external key value store (like HBase, Redis) and
 perform lookups/updates inside of your mappers (you'd need to create the
 connection within a mapPartitions code block to avoid the connection
 setup/teardown overhead)?

 I haven't done this myself though, so I'm just throwing the idea out
 there.

 On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff j...@atware.co.jp wrote:

 Hi,

 I am working on a Spark application that is using of a large (~3G)
 broadcast variable as a lookup table. The application refines the data in
 this lookup table in an iterative manner. So this large variable is
 broadcast many times during the lifetime of the application process.

 From what I have observed perhaps 60% of the execution time is spent
 waiting for the variable to broadcast in each iteration. My reading of a
 Spark performance article[1] suggests that the time spent broadcasting will
 increase with the number of nodes I add.

 My question for the group - what would you suggest as an alternative to
 broadcasting a large variable like this?

 One approach I have considered is segmenting my RDD and adding a copy of
 the lookup table for each X number of values to process. So, for example,
 if I have a list of 1 million entries to process (eg, RDD[Entry]), I could
 split this into segments of 100K entries, with a copy of the lookup table,
 and make that an RDD[(Lookup, Array[Entry]).

 Another solution I am looking at it is making the lookup table an RDD
 instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
 improve performance. One issue with this approach is that I would have to
 rewrite my application code to use two RDDs so that I do not reference the
 lookup RDD in the from within the closure of another RDD.

 Any other recommendations?

 Jeff


 [1]
 http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf

 [2]https://github.com/amplab/spark-indexedrdd





Re: org.apache.spark.shuffle.FetchFailedException

2015-08-25 Thread Raghavendra Pandey
Did you try increasing sql partitions?

On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar iitr.kun...@gmail.com
wrote:

 I am running this query on a data size of 4 billion rows and
 getting org.apache.spark.shuffle.FetchFailedException error.

 select adid,position,userid,price
 from (
 select adid,position,userid,price,
 dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank
 FROM trainInfo) as tmp
 WHERE rank = 2


 I have attached the error logs from spark-sql terminal.

 Please suggest what is the reason for these kind of errors and how can I
 resolve them.


 Regards,
 Kundan


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set environment of worker applications

2015-08-24 Thread Raghavendra Pandey
System properties and environment variables are two different things.. One
can use spark.executor.extraJavaOptions to pass system properties and
spark-env.sh to pass environment variables.

-raghav

On Mon, Aug 24, 2015 at 1:00 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 That's surprising. Passing the environment variables using
 spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then
 fetching them using System.getProperty(myenvvar) has worked for me.

 What is the error that you guys got?

 On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 spark-env.sh works for me in Spark 1.4 but not
 spark.executor.extraJavaOptions.

 On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think the only way to pass on environment variables to worker node is
 to write it in spark-env.sh file on each worker node.

 On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with
 spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: How to set environment of worker applications

2015-08-23 Thread Raghavendra Pandey
I think the only way to pass on environment variables to worker node is to
write it in spark-env.sh file on each worker node.

On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to list all dataframes and RDDs available in current session?

2015-08-21 Thread Raghavendra Pandey
You get the list of all the persistet rdd using spark context...
On Aug 21, 2015 12:06 AM, Rishitesh Mishra rishi80.mis...@gmail.com
wrote:

 I am not sure if you can view all RDDs in a session. Tables are maintained
 in a catalogue . Hence its easier. However  you can see the DAG
 representation , which lists all the RDDs in a job , with Spark UI.
 On 20 Aug 2015 22:34, Dhaval Patel dhaval1...@gmail.com wrote:

 Apologies

 I accidentally included Spark User DL on BCC. The actual email message is
 below.
 =


 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all
 *dataframes/RDDs* that has been created in current session. Anyone knows if
 there is any such commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval



 On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all
 *dataframes/RDDs* that has been created in current session. Anyone knows if
 there is any such commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval





Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-21 Thread Raghavendra Pandey
Did you try with hadoop version 2.7.1 .. It is known that s3a works really
well with parquet which is available in 2.7. They fixed lot of issues
related to metadata reading there...
On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';

 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ .. about 1000 partition paths go here ]

 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
 false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I guess the question is why does spark have to do partition discovery
 with all partitions when the query only needs to look at one partition? Is
 there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I've had the same problem. It turns out that Spark (specifically
 parquet) is very slow at partition discovery. It got better in 1.5 (not yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned
 parquet table with only one partition (date=20140701). A simple `select
 count(*) from table where date=20140701` would run very fast (0.1 seconds).
 However, as I added more partitions the query takes longer and longer. When
 I added about 10,000 partitions, the query took way too long. I feel like
 querying for a single partition should not be affected by having more
 partitions. Is this a known behaviour? What does spark try to do here?



 Thanks,

 Jerrick












Re: Aggregate to array (or 'slice by key') with DataFrames

2015-08-21 Thread Raghavendra Pandey
Impact,
You can group by the data and then sort it by timestamp and take max to
select the oldest value.
On Aug 21, 2015 11:15 PM, Impact nat...@skone.org wrote:

 I am also looking for a way to achieve the reducebykey functionality on
 data
 frames. In my case I need to select one particular row (the oldest, based
 on
 a timestamp column value) by key.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to specify column type when saving DataFrame as parquet file?

2015-08-14 Thread Raghavendra Pandey
I think you can try dataFrame create api that takes RDD[Row] and Struct
type...
On Aug 11, 2015 4:28 PM, Jyun-Fan Tsai jft...@appier.com wrote:

 Hi all,
 I'm using Spark 1.4.1.  I create a DataFrame from json file.  There is
 a column C that all values are null in the json file.  I found that
 the datatype of column C in the created DataFrame is string.  However,
 I would like to specify the column as Long when saving it as parquet
 file.  What should I do to specify the column type when saving parquet
 file?

 Thank you,
 Jyun-Fan Tsai

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Left outer joining big data set with small lookups

2015-08-14 Thread Raghavendra Pandey
In spark 1.4 there is a parameter to control that. Its default value is 10
M. So you need to cache your dataframe to hint the size.
On Aug 14, 2015 7:09 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io
wrote:

 Hi

 I am facing huge performance problem when I am trying to left outer join
 very big data set (~140GB) with bunch of small lookups [Start schema type].
 I am using data frame  in spark sql. It looks like data is shuffled and
 skewed when that join happens. Is there any way to improve performance of
 such type of join in spark?

 How can I hint optimizer to go with replicated join etc., to avoid
 shuffle? Would it help to create broadcast variables on small lookups?  If
 I create broadcast variables, how can I convert them into data frame and
 use them in sparksql type of join?

 Thanks
 Vijay
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DataFrame column structure change

2015-08-08 Thread Raghavendra Pandey
You can use struct function of org.apache.spark.sql.function class to
combine two columns to create struct column.
Sth like.
val nestedCol = struct(df(d), df(e))
df.select(df(a), df(b), df(c), nestedCol)
On Aug 7, 2015 3:14 PM, Rishabh Bhardwaj rbnex...@gmail.com wrote:

 I am doing it by creating a new data frame out of the fields to be nested
 and then join with the original DF.
 Looking for some optimized solution here.

 On Fri, Aug 7, 2015 at 2:06 PM, Rishabh Bhardwaj rbnex...@gmail.com
 wrote:

 Hi all,

 I want to have some nesting structure from the existing columns of
 the dataframe.
 For that,,I am trying to transform a DF in the following way,but couldn't
 do it.

 scala df.printSchema
 root
  |-- a: string (nullable = true)
  |-- b: string (nullable = true)
  |-- c: string (nullable = true)
  |-- d: string (nullable = true)
  |-- e: string (nullable = true)
  |-- f: string (nullable = true)

 *To*

 scala newDF.printSchema
 root
  |-- a: string (nullable = true)
  |-- b: string (nullable = true)
  |-- c: string (nullable = true)
  |-- newCol: struct (nullable = true)
  ||-- d: string (nullable = true)
  ||-- e: string (nullable = true)


 help me.

 Regards,
 Rishabh.





Spark sql jobs n their partition

2015-08-08 Thread Raghavendra Pandey
I have a complex transformation requirements that i m implementing using
dataframe.  It involves lot of joins also with Cassandra table.
I was wondering how can I debug the jobs n stages queued by spark sql the
way I can do for Rdds.

In one of cases, spark sql creates more than 17 lakhs tasks for 2gb data..
I have set sql partition@32.

Raghav


Create StructType column in data frame

2015-07-27 Thread Raghavendra Pandey
Hello,

I would like to add a column of StructType to DataFrame.
What would be the best way to do it? Not sure if it is possible using
withColumn. A possible way is to convert the dataframe into a RDD[Row], add
the struct and then convert it back to dataframe. But that seems an
overkill.

Please note that I don't know the StructType beforehand and I am creating
it based on some configuration so using case classes is out of picture.

Thanks.


Re: Will multiple filters on the same RDD optimized to one filter?

2015-07-16 Thread Raghavendra Pandey
If you cache rdd it will save some operations. But anyway filter is a lazy
operation. And it runs based on what you will do later on with rdd1 and
rdd2...

Raghavendra
On Jul 16, 2015 1:33 PM, Bin Wang wbi...@gmail.com wrote:

 If I write code like this:

 val rdd = input.map(_.value)
 val f1 = rdd.filter(_ == 1)
 val f2 = rdd.filter(_ == 2)
 ...

 Then the DAG of the execution may be this:

  - Filter - ...
 Map
  - Filter - ...

 But the two filters is operated on the same RDD, which means it could be
 done by just scan the RDD once. Does spark have this kind optimization for
 now?



Re: Will multiple filters on the same RDD optimized to one filter?

2015-07-16 Thread Raghavendra Pandey
Depending on what you do with them, they will get computed separately.
Bcoz u may have long dag in each branch. So spark tries to run all the
transformation function together rather than trying to optimize things
across branches.
On Jul 16, 2015 1:40 PM, Bin Wang wbi...@gmail.com wrote:

 What if I would use both rdd1 and rdd2 later?

 Raghavendra Pandey raghavendra.pan...@gmail.com于2015年7月16日周四 下午4:08写道:

 If you cache rdd it will save some operations. But anyway filter is a
 lazy operation. And it runs based on what you will do later on with rdd1
 and rdd2...

 Raghavendra
 On Jul 16, 2015 1:33 PM, Bin Wang wbi...@gmail.com wrote:

 If I write code like this:

 val rdd = input.map(_.value)
 val f1 = rdd.filter(_ == 1)
 val f2 = rdd.filter(_ == 2)
 ...

 Then the DAG of the execution may be this:

  - Filter - ...
 Map
  - Filter - ...

 But the two filters is operated on the same RDD, which means it could be
 done by just scan the RDD once. Does spark have this kind optimization for
 now?




Re: Filter on Grouped Data

2015-07-03 Thread Raghavendra Pandey
Why dont you apply filter first and then Group the data and run
aggregations..
On Jul 3, 2015 1:29 PM, Megha Sridhar- Cynepia megha.sridh...@cynepia.com
wrote:

 Hi,


 I have a Spark DataFrame object, which when trimmed, looks like,



 FromTo  SubjectMessage-ID
 karen@xyz.com['vance.me...@enron.com', SEC Inquiry
 19952575.1075858
  'jeannie.mandel...@enron.com',
  'mary.cl...@enron.com',
  'sarah.pal...@enron.com']



 elyn.hug...@xyz.com['dennis.ve...@enron.com',Revised
 documents33499184.1075858
  'gina.tay...@enron.com',
  'kelly.kimbe...@enron.com']
 .
 .
 .


 I have run a groupBy(From) on the above dataFrame and obtained a
 GroupedData object as a result. I need to apply a filter on the grouped
 data (for instance, getting the sender who sent maximum number of the mails
 that were addressed to a particular receiver in the To list).
 Is there a way to accomplish this by applying filter on grouped data?


 Thanks,
 Megha


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Streaming: updating broadcast variables

2015-07-03 Thread Raghavendra Pandey
You cannot update the broadcasted variable.. It wont get reflected on
workers.
On Jul 3, 2015 12:18 PM, James Cole ja...@binarism.net wrote:

 Hi all,

 I'm filtering a DStream using a function. I need to be able to change this
 function while the application is running (I'm polling a service to see if
 a user has changed their filtering). The filter function is a
 transformation and runs on the workers, so that's where the updates need to
 go. I'm not sure of the best way to do this.

 Initially broadcasting seemed like the way to go: the filter is actually
 quite large. But I don't think I can update something I've broadcasted.
 I've tried unpersisting and re-creating the broadcast variable but it
 became obvious this wasn't updating the reference on the worker. So am I
 correct in thinking I can't use broadcasted variables for this purpose?

 The next option seems to be: stopping the JavaStreamingContext, creating a
 new one from the SparkContext, updating the filter function, and
 re-creating the DStreams (I'm using direct streams from Kafka).

 If I re-created the JavaStreamingContext would the accumulators (which are
 created from the SparkContext) keep working? (Obviously I'm going to try
 this soon)

 In summary:

 1) Can broadcasted variables be updated?

 2) Is there a better way than re-creating the JavaStreamingContext and
 DStreams?

 Thanks,

 James




Re: Optimizations

2015-07-03 Thread Raghavendra Pandey
This is the basic design of spark that it runs all actions in different
stages...
Not sure you can achieve what you r looking for.
On Jul 3, 2015 12:43 PM, Marius Danciu marius.dan...@gmail.com wrote:

 Hi all,

 If I have something like:

 rdd.join(...).mapPartitionToPair(...)

 It looks like mapPartitionToPair runs in a different stage then join. Is
 there a way to piggyback this computation inside the join stage ? ... such
 that each result partition after join is passed to
 the mapPartitionToPair function, all running in the same state without any
 other costs.

 Best,
 Marius



Re: Are Spark Streaming RDDs always processed in order?

2015-07-03 Thread Raghavendra Pandey
I dont think you can expect any order guarantee except the records in one
partition.
 On Jul 4, 2015 7:43 AM, khaledh khal...@gmail.com wrote:

 I'm writing a Spark Streaming application that uses RabbitMQ to consume
 events. One feature of RabbitMQ that I intend to make use of is bulk ack of
 messages, i.e. no need to ack one-by-one, but only ack the last event in a
 batch and that would ack the entire batch.

 Before I commit to doing so, I'd like to know if Spark Streaming always
 processes RDDs in the same order they arrive in, i.e. if RDD1 arrives
 before
 RDD2, is it true that RDD2 will never be scheduled/processed before RDD1 is
 finished?

 This is crucial to the ack logic, since if RDD2 can be potentially
 processed
 while RDD1 is still being processed, then if I ack the the last event in
 RDD2 that would also ack all events in RDD1, even though they may have not
 been completely processed yet.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Are-Spark-Streaming-RDDs-always-processed-in-order-tp23616.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL and Streaming - How to execute JDBC Query only once

2015-07-02 Thread Raghavendra Pandey
This will not work i.e. using data frame inside map function..
Although you can try to create df separately n cache it...
Then you can join your event stream with this df.
On Jul 2, 2015 6:11 PM, Ashish Soni asoni.le...@gmail.com wrote:

 Hi All  ,

 I have and Stream of Event coming in and i want to fetch some additional
 data from the database based on the values in the incoming data , For Eg
 below is the data coming in

 loginName
 Email
 address
 city

 Now for each login name i need to go to oracle database and get the userId
 from the database *but i do not want to hit the database again and again
 instead i want to load the complete table in memory and then find the user
 id based on the incoming data*

 JavaRDDCharge rdd =
 sc.textFile(/home/spark/workspace/data.csv).map(new FunctionString,
 String() {
 @Override
 public Charge call(String s) {
 String str[] = s.split(,);
 *//How to load the complete table in memory and use it as
 when i do outside the loop i get stage failure error *
 *   DataFrame dbRdd =
 sqlContext.read().format(jdbc).options(options).load();*

 System.out.println(dbRdd.filter(ogin_nm='+str[0]+').count());

   return str[0];
 }
 });


 How i can achieve this , Please suggest

 Thanks



Re: DataFrame Filter Inside Another Data Frame Map

2015-07-02 Thread Raghavendra Pandey
You can collect the dataframe as array n then create map out of it...,
On Jul 2, 2015 9:23 AM, asoni.le...@gmail.com wrote:

 Any example how can i return a Hashmap from data frame ?

 Thanks ,
 Ashish

 On Jul 1, 2015, at 11:34 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Collecting it as a regular (Java/scala/Python) map. You can also broadcast
 the map if your going to use it multiple times.

 On Wednesday, July 1, 2015, Ashish Soni asoni.le...@gmail.com wrote:

 Thanks , So if i load some static data from database and then i need to
 use than in my map function to filter records what will be the best way to
 do it,

 Ashish

 On Wed, Jul 1, 2015 at 10:45 PM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 You cannot refer to one rdd inside another rdd.map function...
 Rdd object is not serialiable. Whatever objects you use inside map
 function  should be serializable as they get transferred to executor nodes.
 On Jul 2, 2015 6:13 AM, Ashish Soni asoni.le...@gmail.com wrote:

 Hi All  ,

 I am not sure what is the wrong with below code as it give below error
 when i access inside the map but it works outside

 JavaRDDCharge rdd2 = rdd.map(new FunctionCharge, Charge() {


 @Override
 public Charge call(Charge ch) throws Exception {


* DataFrame df = accountRdd.filter(login=test);*

 return ch;
 }

 });

 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0
 (TID 0)
 java.lang.NullPointerException
 at org.apache.spark.sql.DataFrame.init(DataFrame.scala:129)
 at org.apache.spark.sql.DataFrame.org
 $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)




 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau




Re: StorageLevel.MEMORY_AND_DISK_SER

2015-07-01 Thread Raghavendra Pandey
So do you want to change the behavior of persist api or write the rdd on
disk...
On Jul 1, 2015 9:13 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I think i want to use persist then and write my intermediate RDDs to
 disk+mem.

 On Wed, Jul 1, 2015 at 8:28 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think persist api is internal to rdd whereas write api is for saving
 content on dist.
 Rdd persist will dump your obj bytes serialized on the disk.. If you
 wanna change that behavior you need to override the class serialization
 that your are storing in rdd..
  On Jul 1, 2015 8:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 This is my write API. how do i integrate it here.


  protected def writeOutputRecords(detailRecords:
 RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {
 val writeJob = new Job()
 val schema = SchemaUtil.outputSchema(_detail)
 AvroJob.setOutputKeySchema(writeJob, schema)
 val outputRecords = detailRecords.coalesce(100)
 outputRecords.saveAsNewAPIHadoopFile(outputDir,
   classOf[AvroKey[GenericRecord]],
   classOf[org.apache.hadoop.io.NullWritable],
   classOf[AvroKeyOutputFormat[GenericRecord]],
   writeJob.getConfiguration)
   }

 On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers ko...@tresata.com wrote:

 rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

 On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ?


 --
 Deepak





 --
 Deepak




 --
 Deepak




Re: Issue with parquet write after join (Spark 1.4.0)

2015-07-01 Thread Raghavendra Pandey
By any chance, are you using time field in your df. Time fields are known
to be notorious in rdd conversion.
On Jul 1, 2015 6:13 PM, Pooja Jain pooja.ja...@gmail.com wrote:

 Join is happening successfully as I am able to do count() after the join.

 Error is coming only while trying to write in parquet format on hdfs.

 Thanks,
 Pooja.

 On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It says:

 Caused by: java.net.ConnectException: Connection refused: slave2/...:54845

 Could you look in the executor logs (stderr on slave2) and see what made
 it shut down? Since you are doing a join there's a high possibility of OOM
 etc.


 Thanks
 Best Regards

 On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain pooja.ja...@gmail.com
 wrote:

 Hi,

 We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
 spark-submit. We are facing parquet write issue after doing dataframe joins

 We have a full data set and then an incremental data. We are reading
 them as dataframes, joining them, and then writing the data to the hdfs
 system in parquet format. We are getting the timeout error on the last
 partition.

 But if we do a count on the joined data it is working - which gives us
 the confidence that join is happening properly. Only in case of writing to
 the hdfs it is timing out.

 Code flow:

 // join two data frames - dfBase and dfIncr on primaryKey
 val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === 
 dfIncr(primaryKey), outer)

 // applying a reduce function on each row.
 val mergedDF = joinedDF.map(x =
   reduceFunc(x)
 )

 //converting back to dataframe
 val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)

 //writing to parquet file
 newdf.write.parquet(hdfsfilepath)

 Getting following exception:

 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with 
 no recent heartbeats: 255766 ms exceeds timeout 24 ms
 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on 
 slave2: Executor heartbeat timed out after 255766 ms
 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 
 from TaskSet 7.0
 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 
 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 
 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to 
 kill executor(s) 26
 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
 executor 26 from BlockManagerMaster.
 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block 
 manager BlockManagerId(26, slave2, 54845)
 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully 
 in removeExecutor
 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number 
 of 26 executor(s).
 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now 
 unavailable on executor 26 (193/200, false)
 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested 
 to kill executor(s) 26.
 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated 
 or disconnected! Shutting down. slave2:51849
 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on 
 slave2: remote Rpc client disassociated
 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 
 from TaskSet 7.0
 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
 executor 26 from BlockManagerMaster.
 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully 
 in removeExecutor
 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with 
 remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address 
 is now gated for [5000] ms. Reason is: [Disassociated].
 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated 
 or disconnected! Shutting down. slave2:51849
 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 
 (TID 310, slave2): org.apache.spark.SparkException: Task failed while 
 writing rows.
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at 
 

Re: StorageLevel.MEMORY_AND_DISK_SER

2015-07-01 Thread Raghavendra Pandey
For that you need to change the serialize and deserialize behavior of your
class.
Preferably, you can use Kyro serializers n override the behavior.
For details u can look
https://github.com/EsotericSoftware/kryo/blob/master/README.md
On Jul 1, 2015 9:26 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

i original assumed that persisting is similar to writing. But its not.
Hence i want to change the behavior of intermediate persists.

On Wed, Jul 1, 2015 at 8:46 AM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 So do you want to change the behavior of persist api or write the rdd on
 disk...
 On Jul 1, 2015 9:13 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I think i want to use persist then and write my intermediate RDDs to
 disk+mem.

 On Wed, Jul 1, 2015 at 8:28 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think persist api is internal to rdd whereas write api is for saving
 content on dist.
 Rdd persist will dump your obj bytes serialized on the disk.. If you
 wanna change that behavior you need to override the class serialization
 that your are storing in rdd..
  On Jul 1, 2015 8:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 This is my write API. how do i integrate it here.


  protected def writeOutputRecords(detailRecords:
 RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {
 val writeJob = new Job()
 val schema = SchemaUtil.outputSchema(_detail)
 AvroJob.setOutputKeySchema(writeJob, schema)
 val outputRecords = detailRecords.coalesce(100)
 outputRecords.saveAsNewAPIHadoopFile(outputDir,
   classOf[AvroKey[GenericRecord]],
   classOf[org.apache.hadoop.io.NullWritable],
   classOf[AvroKeyOutputFormat[GenericRecord]],
   writeJob.getConfiguration)
   }

 On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers ko...@tresata.com
 wrote:

 rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

 On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ?


 --
 Deepak





 --
 Deepak




 --
 Deepak




-- 
Deepak


Re: Can a Spark Driver Program be a REST Service by itself?

2015-07-01 Thread Raghavendra Pandey
I am using spark driver as a rest service. I used spray.io to make my app
rest server.

I think this is a good design for applications that you want to keep in
long running mode..
On Jul 1, 2015 6:28 PM, Arush Kharbanda ar...@sigmoidanalytics.com
wrote:

 You can try using Spark Jobserver

 https://github.com/spark-jobserver/spark-jobserver

 On Wed, Jul 1, 2015 at 4:32 PM, Spark Enthusiast sparkenthusi...@yahoo.in
  wrote:

 Folks,

 My Use case is as follows:

 My Driver program will be aggregating a bunch of Event Streams and acting
 on it. The Action on the aggregated events is configurable and can change
 dynamically.

 One way I can think of is to run the Spark Driver as a Service where a
 config push can be caught via an API that the Driver exports.
 Can I have a Spark Driver Program run as a REST Service by itself? Is
 this a common use case?
 Is there a better way to solve my problem?

 Thanks




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com



Re: BroadCast Multiple DataFrame ( JDBC Tables )

2015-07-01 Thread Raghavendra Pandey
I am not sure if you can broadcast data frame without collecting it on
driver...
On Jul 1, 2015 11:45 PM, Ashish Soni asoni.le...@gmail.com wrote:

 Hi ,

 I need to load 10 tables in memory and have them available to all the
 workers , Please let me me know what is the best way to do broadcast them

 sc.broadcast(df)  allow only one

 Thanks,





Re: DataFrame Filter Inside Another Data Frame Map

2015-07-01 Thread Raghavendra Pandey
You cannot refer to one rdd inside another rdd.map function...
Rdd object is not serialiable. Whatever objects you use inside map
function  should be serializable as they get transferred to executor nodes.
On Jul 2, 2015 6:13 AM, Ashish Soni asoni.le...@gmail.com wrote:

 Hi All  ,

 I am not sure what is the wrong with below code as it give below error
 when i access inside the map but it works outside

 JavaRDDCharge rdd2 = rdd.map(new FunctionCharge, Charge() {

 @Override
 public Charge call(Charge ch) throws Exception {


* DataFrame df = accountRdd.filter(login=test);*

 return ch;
 }

 });

 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.NullPointerException
 at org.apache.spark.sql.DataFrame.init(DataFrame.scala:129)
 at org.apache.spark.sql.DataFrame.org
 $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)



Re: Using 'fair' scheduler mode

2015-04-01 Thread Raghavendra Pandey
I am facing the same issue. FAIR and FIFO behaving in the same way.

On Wed, Apr 1, 2015 at 1:49 AM, asadrao as...@microsoft.com wrote:

 Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the
 first query is a very expensive query (ex: ‘select *’ on a really big data
 set) than any subsequent query seem to get blocked. I would have expected
 the second query to run in parallel since I am using the ‘fair’ scheduler
 mode not the ‘fifo’. I am submitting the query through thrift server.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Executor lost with too many temp files

2015-02-25 Thread Raghavendra Pandey
Can you try increasing the ulimit -n on your machine.

On Mon, Feb 23, 2015 at 10:55 PM, Marius Soutier mps@gmail.com wrote:

 Hi Sameer,

 I’m still using Spark 1.1.1, I think the default is hash shuffle. No
 external shuffle service.

 We are processing gzipped JSON files, the partitions are the amount of
 input files. In my current data set we have ~850 files that amount to 60 GB
 (so ~600 GB uncompressed). We have 5 workers with 8 cores and 48 GB RAM
 each. We extract five different groups of data from this to filter, clean
 and denormalize (i.e. join) it for easier downstream processing.

 By the way this code does not seem to complete at all without using
 coalesce() at a low number, 5 or 10 work great. Everything above that make
 it very likely it will crash, even on smaller datasets (~300 files). But
 I’m not sure if this is related to the above issue.


 On 23.02.2015, at 18:15, Sameer Farooqui same...@databricks.com wrote:

 Hi Marius,

 Are you using the sort or hash shuffle?

 Also, do you have the external shuffle service enabled (so that the Worker
 JVM or NodeManager can still serve the map spill files after an Executor
 crashes)?

 How many partitions are in your RDDs before and after the problematic
 shuffle operation?



 On Monday, February 23, 2015, Marius Soutier mps@gmail.com wrote:

 Hi guys,

 I keep running into a strange problem where my jobs start to fail with
 the dreaded Resubmitted (resubmitted due to lost executor)” because of
 having too many temp files from previous runs.

 Both /var/run and /spill have enough disk space left, but after a given
 amount of jobs have run, following jobs will struggle with completion.
 There are a lot of failures without any exception message, only the above
 mentioned lost executor. As soon as I clear out /var/run/spark/work/ and
 the spill disk, everything goes back to normal.

 Thanks for any hint,
 - Marius


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Shuffle read/write issue in spark 1.2

2015-02-05 Thread Raghavendra Pandey
Even I observed the same issue.

On Fri, Feb 6, 2015 at 12:19 AM, Praveen Garg praveen.g...@guavus.com
wrote:

  Hi,

  While moving from spark 1.1 to spark 1.2, we are facing an issue where
 Shuffle read/write has been increased significantly. We also tried running
 the job by rolling back to spark 1.1 configuration where we set
 spark.shuffle.manager to hash and spark.shuffle.blockTransferService to
 nio. It did improve the performance a bit but it was still much worse than
 spark 1.1. The scenario seems similar to the bug raised sometime back
 https://issues.apache.org/jira/browse/SPARK-5081.
 Has anyone come across any similar issue? Please tell us if any
 configuration change can help.

  Regards, Praveen




Re: Cheapest way to materialize an RDD?

2015-02-02 Thread Raghavendra Pandey
You can also do something like
rdd.sparkContext.runJob(rdd,(iter: Iterator[T]) = {
  while(iter.hasNext) iter.next()
})

On Sat, Jan 31, 2015 at 5:24 AM, Sean Owen so...@cloudera.com wrote:

 Yeah, from an unscientific test, it looks like the time to cache the
 blocks still dominates. Saving the count is probably a win, but not
 big. Well, maybe good to know.

 On Fri, Jan 30, 2015 at 10:47 PM, Stephen Boesch java...@gmail.com
 wrote:
  Theoretically your approach would require less overhead - i.e. a collect
 on
  the driver is not required as the last step.  But maybe the difference is
  small and that particular path may or may not have been properly
 optimized
  vs the count(). Do you have a biggish data set to compare the timings?
 
  2015-01-30 14:42 GMT-08:00 Sean Owen so...@cloudera.com:
 
  So far, the canonical way to materialize an RDD just to make sure it's
  cached is to call count(). That's fine but incurs the overhead of
  actually counting the elements.
 
  However, rdd.foreachPartition(p = None) for example also seems to
  cause the RDD to be materialized, and is a no-op. Is that a better way
  to do it or am I not thinking of why it's insufficient?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Is there a way to delete hdfs file/directory using spark API?

2015-01-21 Thread Raghavendra Pandey
You can use Hadoop Client Api to remove files
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#delete(org.apache.hadoop.fs.Path,
boolean). I don't think spark has any wrapper on hadoop filesystem APIs.

On Thu, Jan 22, 2015 at 12:15 PM, LinQili lin_q...@outlook.com wrote:

 Hi, all
 I wonder how to delete hdfs file/directory using spark API?



Re: How to get the master URL at runtime inside driver program?

2015-01-19 Thread Raghavendra Pandey
If you pass spark master URL to spark-submit, you don't need to pass the
same to SparkConf object. You can create SparkConf without this property or
for that matter any other property that you pass in spark-submit.

On Sun, Jan 18, 2015 at 7:38 AM, guxiaobo1982 guxiaobo1...@qq.com wrote:

 Hi,

 Driver programs submitted by the spark-submit script will get the runtime
 spark master URL, but how it get the URL inside the main method when
 creating the SparkConf object?

 Regards,




Re: ALS.trainImplicit running out of mem when using higher rank

2015-01-18 Thread Raghavendra Pandey
If you are running spark in local mode, executor parameters are not used as
there is no executor. You should try to set corresponding driver parameter
to effect it.

On Mon, Jan 19, 2015, 00:21 Sean Owen so...@cloudera.com wrote:

 OK. Are you sure the executor has the memory you think? -Xmx24g in
 its command line? It may be that for some reason your job is reserving
 an exceptionally large amount of non-heap memory. I am not sure that's
 to be expected with the ALS job though. Even if the settings work,
 considering using the explicit command line configuration.

 On Sat, Jan 17, 2015 at 12:49 PM, Antony Mayi antonym...@yahoo.com
 wrote:
  the values are for sure applied as expected - confirmed using the spark
 UI
  environment page...
 
  it comes from my defaults configured using
  'spark.yarn.executor.memoryOverhead=8192' (yes, now increased even
 more) in
  /etc/spark/conf/spark-defaults.conf and 'export
 SPARK_EXECUTOR_MEMORY=24G'
  in /etc/spark/conf/spark-env.sh

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Save RDD with partition information

2015-01-13 Thread Raghavendra Pandey
I believe the default hash partitioner logic in spark will send all the
same keys to same machine.

On Wed, Jan 14, 2015, 03:03 Puneet Kapoor puneet.cse.i...@gmail.com wrote:

 Hi,

 I have a usecase where in I have hourly spark job which creates hourly
 RDDs, which are partitioned by keys.

 At the end of the day I need to access all of these RDDs and combine the
 Key/Value pairs over the day.

 If there is a key K1 in RDD0 (1st hour of day), RDD1 ... RDD23(last hour
 of the day); we need to combine all the values of this K1 using some logic.

 What I want to do is to avoid the shuffling at the end of the day since
 the data in huge ~ hundreds of GB.

 Questions
 ---
 1.) Is there a way that i can persist hourly RDDs with partition
 information and then while reading back the RDDs the partition information
 is restored.
 2.) Can i ensure that partitioning is similar for different hours. Like if
 K1 goes to container_X, it would go to the same container in the next hour
 and so on.

 Regards
 Puneet




Re: Web Service + Spark

2015-01-11 Thread Raghavendra Pandey
You can take a look at http://zeppelin.incubator.apache.org. it is a
notebook and graphic visual designer.

On Sun, Jan 11, 2015, 01:45 Cui Lin cui@hds.com wrote:

  Thanks, Gaurav and Corey,

  Probably I didn’t make myself clear. I am looking for best Spark
 practice similar to Shiny for R, the analysis/visualziation results can be
 easily published to web server and shown from web browser. Or any dashboard
 for Spark?

  Best regards,

  Cui Lin

   From: gtinside gtins...@gmail.com
 Date: Friday, January 9, 2015 at 7:45 PM
 To: Corey Nolet cjno...@gmail.com
 Cc: Cui Lin cui@hds.com, user@spark.apache.org 
 user@spark.apache.org
 Subject: Re: Web Service + Spark

   You can also look at Spark Job Server
 https://github.com/spark-jobserver/spark-jobserver

 - Gaurav

 On Jan 9, 2015, at 10:25 PM, Corey Nolet cjno...@gmail.com wrote:

   Cui Lin,

  The solution largely depends on how you want your services deployed
 (Java web container, Spray framework, etc...) and if you are using a
 cluster manager like Yarn or Mesos vs. just firing up your own executors
 and master.

  I recently worked on an example for deploying Spark services inside of
 Jetty using Yarn as the cluster manager. It forced me to learn how Spark
 wires up the dependencies/classpaths. If it helps, the example that
 resulted from my tinkering is located at [1].


  [1] https://github.com/calrissian/spark-jetty-server

 On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote:

  Hello, All,

  What’s the best practice on deploying/publishing spark-based scientific
 applications into a web service? Similar to Shiny on R.
  Thanks!

  Best regards,

  Cui Lin





Re: Spark SQL: Storing AVRO Schema in Parquet

2015-01-11 Thread Raghavendra Pandey
I think AvroWriteSupport class already saves avro schema as part of parquet
meta data. You can think of using parquet-mr
https://github.com/Parquet/parquet-mr directly.

Raghavendra

On Fri, Jan 9, 2015 at 10:32 PM, Jerry Lam chiling...@gmail.com wrote:

 Hi Raghavendra,

 This makes a lot of sense. Thank you.
 The problem is that I'm using Spark SQL right now to generate the parquet
 file.

 What I think I need to do is to use Spark directly and transform all rows
 from SchemaRDD to avro objects and supply it to use saveAsNewAPIHadoopFile
 (from the PairRDD). From there, I can supply the avro schema to parquet via
 AvroParquetOutputFormat.

 It is not difficult just not as simple as I would like because SchemaRDD
 can write to Parquet file using its schema and if I can supply the avro
 schema to parquet, it save me the transformation step for avro objects.

 I'm thinking of overriding the saveAsParquetFile method to allows me to
 persist the avro schema inside parquet. Is this possible at all?

 Best Regards,

 Jerry


 On Fri, Jan 9, 2015 at 2:05 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I cam across this
 http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. You can take
 a look.


 On Fri Jan 09 2015 at 12:08:49 PM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I have the similar kind of requirement where I want to push avro data
 into parquet. But it seems you have to do it on your own. There
 is parquet-mr project that uses hadoop to do so. I am trying to write a
 spark job to do similar kind of thing.

 On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users,

 I'm using spark SQL to create parquet files on HDFS. I would like to
 store the avro schema into the parquet meta so that non spark sql
 applications can marshall the data without avro schema using the avro
 parquet reader. Currently, schemaRDD.saveAsParquetFile does not allow to do
 that. Is there another API that allows me to do this?

 Best Regards,

 Jerry






Re: Cleaning up spark.local.dir automatically

2015-01-09 Thread Raghavendra Pandey
You may like to look at spark.cleaner.ttl configuration which is infinite
by default. Spark has that configuration to delete temp files time to time.

On Fri Jan 09 2015 at 8:34:10 PM michael.engl...@nomura.com wrote:

  Hi,



 Is there a way of automatically cleaning up the spark.local.dir after a
 job has been run? I have noticed a large number of temporary files have
 been stored here and are not cleaned up. The only solution I can think of
 is to run some sort of cron job to delete files older than a few days. I am
 currently using a mixture of standalone and YARN spark builds.



 Thanks,

 Michael



 This e-mail (including any attachments) is private and confidential, may
 contain proprietary or privileged information and is intended for the named
 recipient(s) only. Unintended recipients are strictly prohibited from
 taking action on the basis of information in this e-mail and must contact
 the sender immediately, delete this e-mail (and all attachments) and
 destroy any hard copies. Nomura will not accept responsibility or liability
 for the accuracy or completeness of, or the presence of any virus or
 disabling code in, this e-mail. If verification is sought please request a
 hard copy. Any reference to the terms of executed transactions should be
 treated as preliminary only and subject to formal written confirmation by
 Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
 communications through its networks (subject to and in accordance with
 applicable laws). No confidentiality or privilege is waived or lost by
 Nomura by any mistransmission of this e-mail. Any reference to Nomura is
 a reference to any entity in the Nomura Holdings, Inc. group. Please read
 our Electronic Communications Legal Notice which forms part of this e-mail:
 http://www.Nomura.com/email_disclaimer.htm



Re: Spark SQL: Storing AVRO Schema in Parquet

2015-01-08 Thread Raghavendra Pandey
I cam across this http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/.
You can take a look.

On Fri Jan 09 2015 at 12:08:49 PM Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 I have the similar kind of requirement where I want to push avro data into
 parquet. But it seems you have to do it on your own. There is parquet-mr
 project that uses hadoop to do so. I am trying to write a spark job to do
 similar kind of thing.

 On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users,

 I'm using spark SQL to create parquet files on HDFS. I would like to
 store the avro schema into the parquet meta so that non spark sql
 applications can marshall the data without avro schema using the avro
 parquet reader. Currently, schemaRDD.saveAsParquetFile does not allow to do
 that. Is there another API that allows me to do this?

 Best Regards,

 Jerry





Re: Spark SQL: Storing AVRO Schema in Parquet

2015-01-08 Thread Raghavendra Pandey
I have the similar kind of requirement where I want to push avro data into
parquet. But it seems you have to do it on your own. There is parquet-mr
project that uses hadoop to do so. I am trying to write a spark job to do
similar kind of thing.

On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users,

 I'm using spark SQL to create parquet files on HDFS. I would like to store
 the avro schema into the parquet meta so that non spark sql applications
 can marshall the data without avro schema using the avro parquet reader.
 Currently, schemaRDD.saveAsParquetFile does not allow to do that. Is there
 another API that allows me to do this?

 Best Regards,

 Jerry



Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread Raghavendra Pandey
You can also use join function of rdd. This is actually kind of append
funtion that add up all the rdds and create one uber rdd.

On Wed, Jan 7, 2015, 14:30 rkgurram rkgur...@gmail.com wrote:

 Thank you for the response, sure will try that out.

 Currently I changed my code such that the first map files.map to
 files.flatMap, which I guess will do similar what you are saying, it
 gives
 me a List[] of elements (in this case LabeledPoints, I could also do RDDs)
 which I then turned into a mega RDD. The current problem seems to be gone,
 I
 no longer get the NPE but further down I am getting a indexOutOfBounds, so
 trying to figure out if the original problem is manifesting itself as a new
 one.


 Regards
 -Ravi




 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-
 uber-RDD-tp20986p21012.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark app performance

2015-01-01 Thread Raghavendra Pandey
I have seen that link. I am using RDD of Byte Array n Kryo serialization.
Inside mapPartition when I measure time it is never more than 1 ms whereas
total time took by application is like 30 min. Codebase has lot of
dependencies. I m trying to come up with a simple version where I can
reproduce this problem.
Also GC timings reported by spark ui is always in the range of 3~4%of total
time.

On Thu, Jan 1, 2015, 14:05 Akhil Das ak...@sigmoidanalytics.com wrote:

 Would be great if you can share the piece of code happening inside your
 mapPartition, I'm assuming you are creating/handling a lot of Complex
 objects and hence it slows down the performance. Here's a link
 http://spark.apache.org/docs/latest/tuning.html to performance tuning
 if you haven't seen it already.

 Thanks
 Best Regards

 On Wed, Dec 31, 2014 at 8:45 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I have a spark app that involves series of mapPartition operations and
 then a keyBy operation. I have measured the time inside mapPartition
 function block. These blocks take trivial time. Still the application takes
 way too much time and even sparkUI shows that much time.
 So i was wondering where does it take time and how can I reduce this.

 Thanks
 Raghavendra





Re: FlatMapValues

2014-12-31 Thread Raghavendra Pandey
Why don't you push \n instead of \t in your first transformation [
(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t
+fields(9)))] and then do saveAsTextFile?

-Raghavendra

On Wed Dec 31 2014 at 1:42:55 PM Sanjay Subramanian
sanjaysubraman...@yahoo.com.invalid wrote:

 hey guys

 My dataset is like this

 025126,Chills,8.10,Injection site oedema,8.10,Injection site
 reaction,8.10,Malaise,8.10,Myalgia,8.10

 Intended output is
 ==
 025126,Chills
 025126,Injection site oedema
 025126,Injection site reaction
 025126,Malaise
 025126,Myalgia

 My code is as follows but the flatMapValues does not work even after I have 
 created the pair RDD.

 

 reacRdd.map(line = line.split(',')).map(fields = {
   if (fields.length = 11  !fields(0).contains(VAERS_ID)) {
 
 (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
   }
   else {
 
   }
   }).filter(line = line.toString.length()  0).flatMapValues(skus = 
 skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)

 


 thanks

 sanjay



Spark app performance

2014-12-30 Thread Raghavendra Pandey
I have a spark app that involves series of mapPartition operations and then
a keyBy operation. I have measured the time inside mapPartition function
block. These blocks take trivial time. Still the application takes way too
much time and even sparkUI shows that much time.
So i was wondering where does it take time and how can I reduce this.

Thanks
Raghavendra


Re: Can Spark 1.1.0 save checkpoint to HDFS 2.5.1?

2014-12-19 Thread Raghavendra Pandey
It seems there is hadoop 1 somewhere in the path.

On Fri, Dec 19, 2014, 21:24 Sean Owen so...@cloudera.com wrote:

 Yes, but your error indicates that your application is actually using
 Hadoop 1.x of some kind. Check your dependencies, especially
 hadoop-client.

 On Fri, Dec 19, 2014 at 2:11 PM, Haopu Wang hw...@qilinsoft.com wrote:
  I’m using Spark 1.1.0 built for HDFS 2.4.
 
  My application enables check-point (to HDFS 2.5.1) and it can build. But
  when I run it, I get below error:
 
 
 
  Exception in thread main org.apache.hadoop.ipc.RemoteException:
 Server IPC
  version 9 cannot communicate with client version 4
 
  at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 
  at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 
  at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
 
  at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
 
  at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
 
  at
  org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
 
  at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238)
 
  at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203)
 
  at
  org.apache.hadoop.hdfs.DistributedFileSystem.initialize(
 DistributedFileSystem.java:89)
 
  at
  org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
 
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
 
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
 
  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
 
  at
  org.apache.spark.streaming.StreamingContext.checkpoint(
 StreamingContext.scala:201)
 
 
 
  Does that mean I have to use HDFS 2.4 to save check-point? Thank you!
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL: How to get the hierarchical element with SQL?

2014-12-08 Thread Raghavendra Pandey
Yeah, the dot notation works. It works even for arrays. But I am not sure
if it can handle complex hierarchies.

On Mon Dec 08 2014 at 11:55:19 AM Cheng Lian lian.cs@gmail.com wrote:

  You may access it via something like SELECT filterIp.element FROM tb,
 just like Hive. Or if you’re using Spark SQL DSL, you can use
 tb.select(filterIp.element.attr).

 On 12/8/14 1:08 PM, Xuelin Cao wrote:


  Hi,

  I'm generating a Spark SQL table from an offline Json file.

  The difficulty is, in the original json file, there is a
 hierarchical structure. And, as a result, this is what I get:

  scala tb.printSchema
 root
  |-- budget: double (nullable = true)
 * |-- filterIp: array (nullable = true)*
 * ||-- element: string (containsNull = false)*
  |-- status: integer (nullable = true)
  |-- third_party: integer (nullable = true)
  |-- userId: integer (nullable = true)

  As you may have noticed, the table schema is with a hierarchical
 structure (element field is a sub-field under the filterIp field).
 Then, my question is, how do I access the element field with SQL?


​



Re: Locking for shared RDDs

2014-12-08 Thread Raghavendra Pandey
You don't need to worry about locks as such as one thread/worker is
responsible exclusively for one partition of the RDD. You can use
Accumulator variables that spark provides to get the state updates.

On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye adbrihadarany...@gmail.com
wrote:

 I am relatively new to Spark. I am planning to use Spark Streaming for my
 OLAP use case, but I would like to know how RDDs are shared between
 multiple
 workers.
 If I need to constantly compute some stats on the streaming data,
 presumably
 shared state would have to updated serially by different spark workers. Is
 this managed by Spark automatically or does the application need to ensure
 distributed locks are acquired?

 Thanks



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org