Re: How is union() implemented? Need to implement column bind

2022-04-21 Thread Sonal Goyal
Seems like an interesting problem to solve!

If I have understood it correctly, you have 10114 files each with the
structure

rowid, colA
r1, a
r2, b
r3, c
...5 million rows

if you union them, you will have
rowid, colA, colB
r1, a, null
r2, b, null
r3, c, null
r1, null, d
r2, null, e
r3, null, f

Will a window partition by rowid and max on column values not work ?

Cheers,
Sonal
https://github.com/zinggAI/zingg



On Thu, Apr 21, 2022 at 6:50 AM Sean Owen  wrote:

> Oh, Spark directly supports upserts (with the right data destination) and
> yeah you could do this as 1+ updates to a table without any pivoting,
> etc. It'd still end up being 10K+ single joins along the way but individual
> steps are simpler. It might actually be pretty efficient I/O wise as
> columnar formats would not rewrite any other data on a write like this.
>
> On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson  wrote:
>
>> Hi Sean
>>
>>
>>
>> My “insert” solution is hack that might work give we can easily spin up a
>> single VM with a crazy amouts of memory. I would prefer to see a
>> distributed solution. It is just a matter of time before someone want to
>> create an even bigger table using cbind.
>>
>>
>>
>> I understand you probably already know a lot about traditional RDBS’s.
>> Much of my post is back ground for others
>>
>>
>>
>> I used to do some of classic relational database work before tools like
>> Hadoop, spark and NoSQL became available .
>>
>>
>>
>> The standard operations on a single table in a relation database are
>>
>>
>>
>> Insert “row”. This is similar to spark union.  Typically primary keys in
>>  in rbdms tables are indexed  to enable quick look up. So insert is
>> probably not 1 for. 1 with union. The row may not simply be appended to the
>> end of the table.
>>
>>
>>
>> Update a “row”
>>
>> Delete a “row”
>>
>> Select “rows where”
>>
>>
>>
>> Rdms server enable row and table level locking. Data must always be in a
>> consistent state. You must commit or abort you changes for them to persist
>> and to release locks on the data. Locks are required because you have a
>> single resource and may user requesting service simultaneously. This is
>> very different from Spark
>>
>>
>>
>> Storage and memory used to be really expensive so often people tried to
>> create “1st normal form” schemas. I.E. no duplicate data to reduce
>> hardware cost.  1st normal design require you to use joins to the get
>> data table you want. Joins are expensive. Often design duplicated some data
>> to improve performance by minimize the number of joins required. Duplicate
>> data make maintaining consistency harder. There are other advantages to
>> normalized data design and as we are all aware in the bigdata world lots of
>> disadvantages. The dbms ran on a single big machine. Join was not
>> implemented as distributed map/reduce.
>>
>>
>>
>> So My idea is use a traditional RDMS server: my final table will have 5
>> million rows and 10,114 columns.
>>
>>1. Read the column vector from each of 10,114 data files
>>2. insert the column vector as a row in the table
>>   1. I read a file that has a single element on each line. All I
>>   need to do is replace \n with ,
>>3. Now I have table with 10,115 rows and 5 million columns
>>4. The row id (primary key) is the original file name
>>5. The columns are the row ids in the original column vectors
>>6. Now all I need to do is pivot this single table to get what I
>>want. This is the only join or map/reduce like operation
>>7. A table with 5million rows and 10,114 columns
>>
>>
>>
>>
>>
>> My final table is about 220 gb. I know at google my I have quota for up 2
>> mega mem machines. Each one has some think like 1.4 Tb of memory
>>
>>
>>
>> Kind regards
>>
>>
>>
>> Andy
>>
>>
>>
>>


Re: Profiling spark application

2022-01-19 Thread Sonal Goyal
Hi Prasad,

Have you checked the SparkListener -
https://mallikarjuna_g.gitbooks.io/spark/content/spark-SparkListener.html ?

Cheers,
Sonal
https://github.com/zinggAI/zingg



On Thu, Jan 20, 2022 at 10:49 AM Prasad Bhalerao <
prasadbhalerao1...@gmail.com> wrote:

> Hello,
>
> Is there any way we can profile spark applications which will show no. of
> invocations of spark api and their execution time etc etc just the way
> jprofiler shows all the details?
>
>
> Thanks,
> Prasad
>


Re: about memory size for loading file

2022-01-13 Thread Sonal Goyal
No it should not. The file would be partitioned and read across each node.

On Fri, 14 Jan 2022 at 11:48 AM, frakass  wrote:

> Hello list
>
> Given the case I have a file whose size is 10GB. The ram of total
> cluster is 24GB, three nodes. So the local node has only 8GB.
> If I load this file into Spark as a RDD via sc.textFile interface, will
> this operation run into "out of memory" issue?
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Cheers,
Sonal
https://github.com/zinggAI/zingg


Re: Joining many tables Re: Pyspark debugging best practices

2022-01-03 Thread Sonal Goyal
Hi Andrew,

Do you think the following would work?

Build data frames by appending a column source to each (sampleName). Add
extra columns as per scheme of quantSchema. Then union. So you get one data
frame with many entries per name. You can then use windowing functions over
them.

On Tue, 4 Jan 2022 at 6:29 AM, Andrew Davidson 
wrote:

> Hi David
>
>
>
> I need to select 1 column from many files and combine them into a single
> table.
>
>
>
> I do not believe union() will work. It appends rows, not columns.
>
>
>
> As far as I know join() is the only way to append columns from different
> data frames.
>
>
>
> I think you correct that using lazy evaluation over a lot of joins may
> make the execution plan to complicated. To debug I added
>
>
>
> logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i,
> df.count(), retDF.count()  )
>
>
>
> to try and simplify the execution plan.
>
>
>
> Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started
> making some progress how ever fails after a few files. Resources are maxed
> out!
>
>
>
> I estimated that that the raw data should be < 500 GB. I am running a
> cluster with 2.8 TB that should be more than enough to spark over head
>
>
>
> Is spark integrated with the python garbage collector?
>
>
>
> I assume createOrReplaceTempView() would cause cache to get flushed as
> needed?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> 
> ###
>
>
> def _loadSalmonReadsTable(self):
>
> '''
>
> AEDWIP TODO
>
> '''
>
> self.logger.info( "BEGIN" )
>
> retNumReadsDF = None
>
> quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength`
> DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE "
>
> for i in range( len(self.fileList) ):
>
> #
>
> # get NumReads from next salmon *quant* file
>
> #
>
> quantFile = self.fileList[i]
>
> sampleDF = self.spark.read.load( quantFile, format="*csv*",
> sep="\t",
>
>  schema=quantSchema, header="true" )
>
> # did not fix bug .repartition(50)
>
>
>
> sampleName = self.sampleNamesList[i]
>
> sampleDF = sampleDF.select( ["Name", "NumReads"] )\
>
> .withColumnRenamed( "NumReads", sampleName )
>
>
>
> sampleDF.createOrReplaceTempView( "sample" )
>
>
>
> self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num
> rows:{} *num* *cols*:{} *num* parts:{}"
>
>  .format(i, sampleName, sampleDF.count(),
> len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))
>
>
>
> #
>
> # append NumReads to table of reads
>
> #
>
>
>
> # the sample name must be quoted else column names with a '-'
>
> # like 1117F-0426-SM-5EGHI will generate an error
>
> # spark think the '-' is an expression. '_' is also
>
> # a special char for the *sql* like operator
>
> # https://stackoverflow.com/a/63899306/4586180
>
> sqlStmt = '\t\t\t\t\t\*tselect* *rc*.*, `{}` \n\
>
> from \n\
>
>retNumReadsDF as *rc*, \n\
>
>sample  \n\
>
> where \n\
>
> rc.Name == sample.Name \n'.format(
> sampleName )
>
>
>
> self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
>
> if i == 0 :
>
> retNumReadsDF = sampleDF
>
> else :
>
> retNumReadsDF = self.spark.sql( sqlStmt )
>
>
>
> retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )
>
>
>
> #
>
> # debug. seems like we do not make progress when we run on
> training
>
> # nothing happens, logs do not change, cluster metrics drop
> suggesting no work
>
> # is being done
>
> # add an action to try and debug
>
> # this should not change the physical plan. I.e. we still
> have the same number of shuffles
>
> # which results in the same number of stage. We are just not
> building up a plan with thousands
>
> # of stages.
>
> #
>
> self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} *num*
> *cols*:{} *num* parts:{}"
>
>  .format(i, retNumReadsDF.count(),
> len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )
>
>
>
> #
>
> # TODO AEDWIP spark analyze chapter 18 debugging joins
>
>
>
> # execution plan should be the same for each join
>
> #rawCountsSDF.explain()
>
>
>
> self.logger.info( "END\n" )
>
> return retNumReadsDF
>
>
>
>
>
> *From: *David Diebold 
> *Date: *Monday, January 3, 2022 at 12:39 AM
> *To: *Andrew 

Re: Feature (?): Setting custom parameters for a Spark MLlib pipeline

2021-10-25 Thread Sonal Goyal
Hi Martin,

Agree, if you don't need the other features of MLFlow then it is likely
overkill.

Cheers,
Sonal
https://github.com/zinggAI/zingg



On Mon, Oct 25, 2021 at 4:06 PM  wrote:

> Hi Sonal,
>
> Thanks a lot for this suggestion. I presume it might indeed be possible to
> use MLFlow for this purpose, but at present it seems a bit too much to
> introduce another framework only for storing arbitrary meta-data with
> trained ML pipelines. I was hoping there might be a way to do this natively
> in Spark ML. Otherwise, I'll just create a wrapper class for the trained
> models.
>
> Cheers,
>
> Martin
>
>
>
> Am 2021-10-24 21:16, schrieb Sonal Goyal:
>
> Does MLFlow help you? https://mlflow.org/
>
> I don't know if ML flow can save arbitrary key-value pairs and associate
> them with a model, but versioning and evaluation etc are supported.
>
> Cheers,
> Sonal
> https://github.com/zinggAI/zingg
>
>
> On Wed, Oct 20, 2021 at 12:59 PM  wrote:
>
> Hello,
>
> This is my first post to this list, so I hope I won't violate any
> (un)written rules.
>
> I recently started working with SparkNLP for a larger project. SparkNLP in
> turn is based Apache Spark's MLlib. One thing I found missing is the
> ability to store custom parameters in a Spark pipeline. It seems only
> certain pre-configured parameter values are allowed (e.g. "stages" for the
> Pipeline class).
>
> IMHO, it would be handy to be able to store custom parameters, e.g. for
> model versions or other meta-data, so that these parameters are stored with
> a trained pipeline, for instance. This could also be used to include
> evaluation results, such as accuracy, with trained ML models.
>
> (I also asked this on Stackoverflow, but didn't get a response, yet:
> https://stackoverflow.com/questions/69627820/setting-custom-parameters-for-a-spark-mllib-pipeline
> )
>
> Would does the community think about this proposal? Has it been discussed
> before perhaps? Any thoughts?
>
> Cheers,
>
> Martin
>
>


Re: Feature (?): Setting custom parameters for a Spark MLlib pipeline

2021-10-24 Thread Sonal Goyal
Does MLFlow help you? https://mlflow.org/

I don't know if ML flow can save arbitrary key-value pairs and associate
them with a model, but versioning and evaluation etc are supported.

Cheers,
Sonal
https://github.com/zinggAI/zingg



On Wed, Oct 20, 2021 at 12:59 PM  wrote:

> Hello,
>
> This is my first post to this list, so I hope I won't violate any
> (un)written rules.
>
> I recently started working with SparkNLP for a larger project. SparkNLP in
> turn is based Apache Spark's MLlib. One thing I found missing is the
> ability to store custom parameters in a Spark pipeline. It seems only
> certain pre-configured parameter values are allowed (e.g. "stages" for the
> Pipeline class).
>
> IMHO, it would be handy to be able to store custom parameters, e.g. for
> model versions or other meta-data, so that these parameters are stored with
> a trained pipeline, for instance. This could also be used to include
> evaluation results, such as accuracy, with trained ML models.
>
> (I also asked this on Stackoverflow, but didn't get a response, yet:
> https://stackoverflow.com/questions/69627820/setting-custom-parameters-for-a-spark-mllib-pipeline
> )
>
> Would does the community think about this proposal? Has it been discussed
> before perhaps? Any thoughts?
>
> Cheers,
>
> Martin
>


Re: How to change a DataFrame column from nullable to not nullable in PySpark

2021-10-14 Thread Sonal Goyal
I see some nice answers at
https://stackoverflow.com/questions/46072411/can-i-change-the-nullability-of-a-column-in-my-spark-dataframe

On Thu, 14 Oct 2021 at 5:21 PM, ashok34...@yahoo.com.INVALID
 wrote:

> Gurus,
>
> I have an RDD in PySpark that I can convert to DF through
>
> df = rdd.toDF()
>
> However, when I do
>
> df.printSchema()
>
> I see the columns as nullable. = true by default
>
> root
>  |-- COL-1: long (nullable = true)
>  |-- COl-2: double (nullable = true)
>  |-- COl-3: string (nullable = true)
>
> What would be the easiest way to make COL-1 NOT NULLABLE
>
> Thanking you
>
-- 
Cheers,
Sonal
https://github.com/zinggAI/zingg


Re: [EXTERNAL] [Marketing Mail] Re: [Spark] Optimize spark join on different keys for same data frame

2021-10-06 Thread Sonal Goyal
Have you tried

partitioning df1, df2 on key1,
join them
Partition df3 and result above on key2
Join again

That’s the strategy I use and it scales well for me. For reference check
getBlocks in

https://github.com/zinggAI/zingg/blob/main/core/src/main/java/zingg/Matcher.java


On Tue, 5 Oct 2021 at 3:05 PM, Saurabh Gulati
 wrote:

> Hi Amit,
> The only approach I can think of is to create 2 copies of schema_df1​,
> one partitioned on key1 and other on key2 and then use these to Join.
> --
> *From:* Amit Joshi 
> *Sent:* 04 October 2021 19:13
> *To:* spark-user 
> *Subject:* [EXTERNAL] [Marketing Mail] Re: [Spark] Optimize spark join on
> different keys for same data frame
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> Hi spark users,
>
> Can anyone please provide any views on the topic.
>
>
> Regards
> Amit Joshi
>
> On Sunday, October 3, 2021, Amit Joshi  wrote:
>
> Hi Spark-Users,
>
> Hope you are doing good.
>
> I have been working on cases where a dataframe is joined with more than
> one data frame separately, on different cols, that too frequently.
> I was wondering how to optimize the join to make them faster.
> We can consider the dataset to be big in size so broadcast joins is not an
> option.
>
> For eg:
>
> schema_df1  = new StructType()
> .add(StructField("key1", StringType, true))
> .add(StructField("key2", StringType, true))
> .add(StructField("val", DoubleType, true))
>
>
> schema_df2  = new StructType()
> .add(StructField("key1", StringType, true))
> .add(StructField("val", DoubleType, true))
>
>
> schema_df3  = new StructType()
> .add(StructField("key2", StringType, true))
> .add(StructField("val", DoubleType, true))
>
> Now if we want to join
> join1 =  df1.join(df2,"key1")
> join2 =  df1.join(df3,"key2")
>
> I was thinking of bucketing as a solution to speed up the joins. But if I
> bucket df1 on the key1,then join2  may not benefit, and vice versa (if
> bucket on key2 for df1).
>
> or Should we bucket df1 twice, one with key1 and another with key2?
> Is there a strategy to make both the joins faster for both the joins?
>
>
> Regards
> Amit Joshi
>
>
>
> --
Cheers,
Sonal
https://github.com/zinggAI/zingg


[Announcement] Zingg fuzzy matching for entity resolution, deduplication and data mastering

2021-09-13 Thread Sonal Goyal
Hi All,

Super stoked to announce open sourcing Zingg, a Spark based tool to build
unified customer and supplier profiles and remove duplicates.

More details at https://github.com/zinggAI/zingg

I do hope some of you will find it useful.

Cheers,
Sonal
https://github.com/zinggAI/zingg


Re: How to submit a job via REST API?

2020-11-24 Thread Sonal Goyal
You should be able to supply the --conf and its values as part of appArgs
argument

Cheers,
Sonal
Nube Technologies 
Join me at
Data Con LA Oct 23 | Big Data Conference Europe. Nov 24 | GIDS AI/ML Dec 3




On Tue, Nov 24, 2020 at 11:31 AM Dennis Suhari 
wrote:

> Hi Yang,
>
> I am using Livy Server for submitting jobs.
>
> Br,
>
> Dennis
>
>
>
> Von meinem iPhone gesendet
>
> Am 24.11.2020 um 03:34 schrieb Zhou Yang :
>
> 
> Dear experts,
>
> I found a convenient way to submit job via Rest API at
> https://gist.github.com/arturmkrtchyan/5d8559b2911ac951d34a#file-submit_job-sh
> .
> But I did not know whether can I append `—conf` parameter like what I did
> in spark-submit. Can someone can help me with this issue?
>
> *Regards, Yang*
>
>


Re: spark cassandra questiom

2020-11-23 Thread Sonal Goyal
Yes, it should be good to use Spark for this use case in my opinion. You
can look into using the Cassandra Spark connector for persisting your
updated data into Cassandra.

Cheers,
Sonal
Nube Technologies 
Join me at
Data Con LA Oct 23 | Big Data Conference Europe. Nov 24 | GIDS AI/ML Dec 3




On Tue, Nov 10, 2020 at 6:39 PM adfel70  wrote:

> I an very very new to both spark and spark structured streaming. I have to
> write an application that receives a very very large csv files in hdfs
> folder. the app must take the file and on each row it must read from
> Cassandra data base some rows (not many rows will be returned for each row
> in csv). On each row it red it must preform some simple calculations and
> update the rows it red with the results and save the updated rows to
> Cassandra.
>
> I have spark version 2.4 and must use python.
>
> Is this a suitable scenario for spark structured streaming?
>
> thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: mission statement : unified

2020-10-19 Thread Sonal Goyal
My thought is that Spark supports analytics for structured and unstructured
data, batch as well as real time. This was pretty revolutionary when Spark
first came out. That's where the unified term came from I think. Even after
all these years, Spark remains the trusted framework for enterprise
analytics.

On Mon, 19 Oct 2020, 11:24 Gourav Sengupta  Hi,
>
> I think that it is just a marketing statement. But with SPARK 3.x, now
> that you are seeing that SPARK is no more than just another distributed
> data processing engine, they are trying to join data pre-processing into ML
> pipelines directly. I may call that unified.
>
> But you get the same with several other frameworks as well now so not
> quite sure how unified creates a unique brand value.
>
>
> Regards,
> Gourav Sengupta
>
> On Sun, Oct 18, 2020 at 6:40 PM Hulio andres  wrote:
>
>>
>> Apache Spark's  mission statement is  *Apache Spark™* is a unified
>> analytics engine for large-scale data processing.
>>
>> To what is the word "unified" inferring ?
>>
>>
>>
>>
>>
>>
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [pyspark 2.3+] Dedupe records

2020-05-29 Thread Sonal Goyal
Hi Rishi,

1. Dataframes are RDDs under the cover. If you have unstructured data or if
you know something about the data through which you can optimize the
computation. you can go with RDDs. Else the Dataframes which are optimized
by Spark SQL should be fine.
2. For incremental deduplication, I guess you can hash your data based on
some particular values and then only compare the new records against the
ones which have the same hash. That should reduce the order of comparisons
drastically provided you can come up with a good indexing/hashing scheme as
per your dataset.

Thanks,
Sonal
Nube Technologies 






On Sat, May 30, 2020 at 8:17 AM Rishi Shah  wrote:

> Hi All,
>
> I have around 100B records where I get new , update & delete records.
> Update/delete records are not that frequent. I would like to get some
> advice on below:
>
> 1) should I use rdd + reducibly or DataFrame window operation for data of
> this size? Which one would outperform the other? Which is more reliable and
> low maintenance?
> 2) Also how would you suggest we do incremental deduplication? Currently
> we do full processing once a week and no dedupe during week days to avoid
> heavy processing. However I would like to explore incremental dedupe option
> and weight pros/cons.
>
> Any input is highly appreciated!
>
> --
> Regards,
>
> Rishi Shah
>


Re: How to populate all possible combination values in columns using Spark SQL

2020-05-06 Thread Sonal Goyal
As mentioned in the comments on SO, can you provide a (masked) sample of
the data? It will be easier to see what you are trying to do if you add the
year column

Thanks,
Sonal
Nube Technologies 






On Thu, May 7, 2020 at 10:26 AM Aakash Basu 
wrote:

> Hi,
>
> I've described the problem in Stack Overflow with a lot of detailing, can
> you kindly check and help if possible?
>
> https://stackoverflow.com/q/61643910/5536733
>
> I'd be absolutely fine if someone solves it using Spark SQL APIs rather
> than plain spark SQL query.
>
> Thanks,
> Aakash.
>


Re: [pyspark] Load a master data file to spark ecosystem

2020-04-24 Thread Sonal Goyal
How does your tree_lookup_value function work?

Thanks,
Sonal
Nube Technologies 






On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran  wrote:

> Hi Team,
>
> I have asked this question in stack overflow
> 
> and I didn't really get any convincing answers. Can somebody help me to
> solve this issue?
>
> Below is my problem
>
> While building a log processing system, I came across a scenario where I
> need to look up data from a tree file (Like a DB) for each and every log
> line for corresponding value. What is the best approach to load an external
> file which is very large into the spark ecosystem? The tree file is of size
> 2GB.
>
> Here is my scenario
>
>1. I have a file contains huge number of log lines.
>2. Each log line needs to be split by a delimiter to 70 fields
>3. Need to lookup the data from tree file for one of the 70 fields of
>a log line.
>
> I am using Apache Spark Python API and running on a 3 node cluster.
>
> Below is the code which I have written. But it is really slow
>
> def process_logline(line, tree):
> row_dict = {}
> line_list = line.split(" ")
> row_dict["host"] = tree_lookup_value(tree, line_list[0])
> new_row = Row(**row_dict)
> return new_row
> def run_job(vals):
> spark.sparkContext.addFile('somefile')
> tree_val = open(SparkFiles.get('somefile'))
> lines = spark.sparkContext.textFile("log_file")
> converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val))
> log_line_rdd = spark.createDataFrame(converted_lines_rdd)
> log_line_rdd.show()
>
> Basically I need some option to load the file one time in memory of workers 
> and start using it entire job life time using Python API.
>
> Thanks in advance
> Arjun
>
>
>
>


Re: Is RDD thread safe?

2019-11-19 Thread Sonal Goyal
the RDD or the dataframe is distributed and partitioned by Spark so as to
leverage all your workers (CPUs) effectively. So all the Dataframe
operations are actually happening simultaneously on a section of the data.
Why do you want to use threading here?

Thanks,
Sonal
Nube Technologies 






On Tue, Nov 12, 2019 at 7:18 AM Chang Chen  wrote:

>
> Hi all
>
> I meet a case where I need cache a source RDD, and then create different
> DataFrame from it in different threads to accelerate query.
>
> I know that SparkSession is thread safe(
> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
> whether RDD  si thread safe or not
>
> Thanks
> Chang
>


Re: Is it possible to rate limit an UDP?

2019-01-09 Thread Sonal Goyal
Have you tried controlling the number of partitions of the dataframe? Say
you have 5 partitions, it means you are making 5 concurrent calls to the
web service. The throughput of the web service would be your bottleneck and
Spark workers would be waiting for tasks, but if you cant control the REST
service, maybe its worth a shot.

Thanks,
Sonal
Nube Technologies 






On Wed, Jan 9, 2019 at 4:51 AM  wrote:

> I have a data frame for which I apply an UDF that calls a REST web
> service.  This web service is distributed in only a few nodes and it won’t
> be able to handle a massive load from Spark.
>
>
>
> Is it possible to rate limit this UDP? For example , something like 100
> op/s.
>
>
>
> If not , what are the options? Is splitting the df an option?
>
>
>
> I’ve read a similar question in Stack overflow [1] and the solution
> suggests Spark Streaming , but my application does not involve streaming.
> Do I need to turn the operations into a streaming workflow to achieve
> something like that?
>
>
>
> Current Workflow : Hive -> Spark ->  Service
>
>
>
> Thank you
>
>
>
> [1]
> https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation
>


Re: Error in show()

2018-09-08 Thread Sonal Goyal
It says serialization error - could there be a column value which is not
getting parsed as int in one of the rows 31-60? The relevant Python code in
serializers.py which is throwing the error is

def read_int(stream):
length = stream.read(4)
if not length:
raise EOFError
return struct.unpack("!i", length)[0]


Thanks,
Sonal
Nube Technologies 





On Fri, Sep 7, 2018 at 12:22 PM, Apostolos N. Papadopoulos <
papad...@csd.auth.gr> wrote:

> Can you isolate the row that is causing the problem? I mean start using
> show(31) up to show(60).
>
> Perhaps this will help you to understand the problem.
>
> regards,
>
> Apostolos
>
>
>
> On 07/09/2018 01:11 πμ, dimitris plakas wrote:
>
> Hello everyone, I am new in Pyspark and i am facing an issue. Let me
> explain what exactly is the problem.
>
> I have a dataframe and i apply on this a map() function
> (dataframe2=datframe1.rdd.map(custom_function())
> dataframe = sqlContext.createDataframe(dataframe2)
>
> when i have
>
> dataframe.show(30,True) it shows the result,
>
> when i am using dataframe.show(60, True) i get the error. The Error is in
> the attachement Pyspark_Error.txt.
>
> Could you please explain me what is this error and how to overpass it?
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> --
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol
>
>


Re: [External Sender] How to debug Spark job

2018-09-08 Thread Sonal Goyal
You could also try to profile your program on the executor or driver by
using jvisualvm or yourkit to see if there is any memory/cpu optimization
you could do.

Thanks,
Sonal
Nube Technologies 





On Fri, Sep 7, 2018 at 6:35 PM, James Starks  wrote:

> Got the root cause eventually as it throws java.lang.OutOfMemoryError:
> Java heap space. Increasing --driver-memory temporarily fixes the problem.
> Thanks.
>
> ‐‐‐ Original Message ‐‐‐
> On 7 September 2018 12:32 PM, Femi Anthony 
> wrote:
>
> One way I would go about this would be to try running a new_df.show(numcols,
> truncate=False) on a few columns before you try writing to parquet to
> force computation of newdf and see whether the hanging is occurring at that
> point or during the write. You may also try doing a newdf.count() as well.
>
> Femi
>
> On Fri, Sep 7, 2018 at 5:48 AM James Starks 
> wrote:
>
>>
>> I have a Spark job that reads from a postgresql (v9.5) table, and write
>> result to parquet. The code flow is not complicated, basically
>>
>> case class MyCaseClass(field1: String, field2: String)
>> val df = spark.read.format("jdbc")...load()
>> df.createOrReplaceTempView(...)
>> val newdf = spark.sql("seslect field1, field2 from
>> mytable").as[MyCaseClass].map { row =>
>>   val fieldX = ... // extract something from field2
>>   (field1, fileldX)
>> }.filter { ... /* filter out field 3 that's not valid */ }
>> newdf.write.mode(...).parquet(destPath)
>>
>> This job worked correct without a problem. But it's doesn't look working
>> ok (the job looks like hanged) when adding more fields. The refactored job
>> looks as below
>> ...
>> val newdf = spark.sql("seslect field1, field2, ... fieldN from
>> mytable").as[MyCaseClassWithMoreFields].map { row =>
>> ...
>> NewCaseClassWithMoreFields(...) // all fields plus fieldX
>> }.filter { ... }
>> newdf.write.mode(...).parquet(destPath)
>>
>> Basically what the job does is extracting some info from one of a field
>> in db table, appends that newly extracted field to the original row, and
>> then dumps the whole new table to parquet.
>>
>> new filed + (original field1 + ... + original fieldN)
>> ...
>> ...
>>
>> Records loaded by spark sql to spark job (before refactored) are around
>> 8MM, this remains the same, but when the refactored spark runs, it looks
>> hanging there without progress. The only output on the console is (there is
>> no crash, no exceptions thrown)
>>
>> WARN  HeartbeatReceiver:66 - Removing executor driver with no recent
>> heartbeats: 137128 ms exceeds timeout 12 ms
>>
>> Memory in top command looks like
>>
>> VIRT RES SHR%CPU %MEM
>> 15.866g 8.001g  41.4m 740.3   25.6
>>
>> The command used to  submit spark job is
>>
>> spark-submit --class ... --master local[*] --driver-memory 10g
>> --executor-memory 10g ... --files ... --driver-class-path ... 
>> ...
>>
>> How can I debug or check which part of my code might cause the problem
>> (so I can improve it)?
>>
>> Thanks
>>
>>
>>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>


Re: Default Java Opts Standalone

2018-08-30 Thread Sonal Goyal
Hi Eevee,

For the executor, have you tried

a. Passing --conf "spark.executor.extraJavaOptions=-XX" as part of the
spark-submit command line if you want it application specific OR
b. Setting spark.executor.extraJavaOptions in conf/spark-default.conf for
all jobs.


Thanks,
Sonal
Nube Technologies 





On Thu, Aug 30, 2018 at 5:12 PM, Evelyn Bayes  wrote:

> Hey all,
>
> Stuck trying to set a parameter in the spark-env.sh and I’m hoping someone
> here knows how.
>
> I want to set the JVM setting -XX:+ExitOnOutOfMemoryError for both Spark
> executors and Spark workers in a standalone mode.
>
> So far my best guess so far is:
> *Worker*
> SPARK_WORKER_OPTS=“${SPARK_WORKER_OPTS} -Dspark.worker.
> extraJavaOptions=-XX:+ExitOnOutOfMemoryError”
> *Executor*
> SPARK_DAEMON_JAVA_OPTS=“${SPARK_DAEMON_JAVA_OPTS} -Dspark.executor.
> extraJavaOptions=-XX:+ExitOnOutOfMemoryError”
>
> Anyone know the actual way to set this or a good place to learn about how
> this stuff works? I’ve already seen the Spark conf and standalone
> documentation and it doesn’t really make this stuff clear.
>
> Thanks a bunch,
> Eevee.
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Sonal Goyal
Hi Patrick,

Sorry is there something here that helps you beyond repartition(number of
partitons) or calling your udf on foreachPartition? If your data is on
disk, Spark is already partitioning it for you by rows. How is adding the
host info helping?

Thanks,
Sonal
Nube Technologies 





On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
pmccar...@dstillery.com.invalid> wrote:

> Mostly I'm guessing that it adds efficiency to a job where partitioning is
> required but shuffling is not.
>
> For example, if I want to apply a UDF to 1tb of records on disk, I might
> need to repartition(5) to get the task size down to an acceptable size
> for my cluster. If I don't care that it's totally balanced, then I'd hope
> that I could save a lot of overhead with
>
> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
> 'randkey','host').apply(udf)
>
>
> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
> wrote:
>
>> Well if we think of shuffling as a necessity to perform an operation,
>> then the problem would be that you are adding a ln aggregation stage to a
>> job that is going to get shuffled anyway.  Like if you need to join two
>> datasets, then Spark will still shuffle the data, whether they are grouped
>> by hostname prior to that or not.  My question is, is there anything else
>> that you would expect to gain, except for enforcing maybe a dataset that is
>> already bucketed? Like you could enforce that data is where it is supposed
>> to be, but what else would you avoid?
>>
>> Sent from my iPhone
>>
>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy > .INVALID> wrote:
>> >
>> > When debugging some behavior on my YARN cluster I wrote the following
>> PySpark UDF to figure out what host was operating on what row of data:
>> >
>> > @F.udf(T.StringType())
>> > def add_hostname(x):
>> >
>> > import socket
>> >
>> > return str(socket.gethostname())
>> >
>> > It occurred to me that I could use this to enforce node-locality for
>> other operations:
>> >
>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>> >
>> > When working on a big job without obvious partition keys, this seems
>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>> >
>> > What problems would I introduce by trying to partition on hostname like
>> this?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Caching small Rdd's take really long time and Spark seems frozen

2018-08-24 Thread Sonal Goyal
Without knowing too much about your application, it would be hard to say.
Maybe it is working faster in local as there is no shuffling etc? The
spark.ui would be your best bet to know what stage is slowing things down.

On Fri 24 Aug, 2018, 3:26 PM Guillermo Ortiz,  wrote:

> Another test I just did it's to execute with local[X] and this problem
> doesn't happen.  Communication problems?
>
> 2018-08-23 22:43 GMT+02:00 Guillermo Ortiz :
>
>> it's a complex DAG before the point I cache the RDD, they are some joins,
>> filter and maps before caching data, but most of the times it doesn't take
>> almost time to do it. I could understand if it would take the same time all
>> the times to process or cache the data. Besides it seems random and they
>> are any weird data in the input.
>>
>> Another test I tried it's disabled caching, and I saw that all the
>> microbatches last the same time, so it seems that it's relation with
>> caching these RDD's.
>>
>> El jue., 23 ago. 2018 a las 15:29, Sonal Goyal ()
>> escribió:
>>
>>> How are these small RDDs created? Could the blockage be in their compute
>>> creation instead of their caching?
>>>
>>> Thanks,
>>> Sonal
>>> Nube Technologies <http://www.nubetech.co>
>>>
>>> <http://in.linkedin.com/in/sonalgoyal>
>>>
>>>
>>>
>>> On Thu, Aug 23, 2018 at 6:38 PM, Guillermo Ortiz 
>>> wrote:
>>>
>>>> I use spark with caching with persist method. I have several RDDs what
>>>> I cache but some of them are pretty small (about 300kbytes). Most of time
>>>> it works well and usually lasts 1s the whole job, but sometimes it takes
>>>> about 40s to store 300kbytes to cache.
>>>>
>>>> If I go to the SparkUI->Cache, I can see how the percentage is
>>>> increasing until 83% (250kbytes) and then it stops for a while. If I check
>>>> the event time in the Spark UI I can see that when this happen there is a
>>>> node where tasks takes very long time. This node could be any from the
>>>> cluster, it's not always the same.
>>>>
>>>> In the spark executor logs I can see it's that it takes about 40s in
>>>> store 3.7kb when this problem occurs
>>>>
>>>> INFO  2018-08-23 12:46:58 Logging.scala:54 -
>>>> org.apache.spark.storage.BlockManager: Found block rdd_1705_23 locally
>>>> INFO  2018-08-23 12:47:38 Logging.scala:54 -
>>>> org.apache.spark.storage.memory.MemoryStore: Block rdd_1692_7 stored as
>>>> bytes in memory (estimated size 3.7 KB, free 1048.0 MB)
>>>> INFO  2018-08-23 12:47:38 Logging.scala:54 -
>>>> org.apache.spark.storage.BlockManager: Found block rdd_1692_7 locally
>>>>
>>>> I have tried with MEMORY_ONLY, MEMORY_AND_SER and so on with the same
>>>> results. I have checked the IO disk (although if I use memory_only I guess
>>>> that it doesn't have sense) and I can't see any problem. This happens
>>>> randomly, but it could be in the 25% of the jobs.
>>>>
>>>> Any idea about what it could be happening?
>>>>
>>>
>>>
>


Re: Caching small Rdd's take really long time and Spark seems frozen

2018-08-23 Thread Sonal Goyal
How are these small RDDs created? Could the blockage be in their compute
creation instead of their caching?

Thanks,
Sonal
Nube Technologies 





On Thu, Aug 23, 2018 at 6:38 PM, Guillermo Ortiz 
wrote:

> I use spark with caching with persist method. I have several RDDs what I
> cache but some of them are pretty small (about 300kbytes). Most of time it
> works well and usually lasts 1s the whole job, but sometimes it takes about
> 40s to store 300kbytes to cache.
>
> If I go to the SparkUI->Cache, I can see how the percentage is increasing
> until 83% (250kbytes) and then it stops for a while. If I check the event
> time in the Spark UI I can see that when this happen there is a node where
> tasks takes very long time. This node could be any from the cluster, it's
> not always the same.
>
> In the spark executor logs I can see it's that it takes about 40s in store
> 3.7kb when this problem occurs
>
> INFO  2018-08-23 12:46:58 Logging.scala:54 - 
> org.apache.spark.storage.BlockManager:
> Found block rdd_1705_23 locally
> INFO  2018-08-23 12:47:38 Logging.scala:54 - 
> org.apache.spark.storage.memory.MemoryStore:
> Block rdd_1692_7 stored as bytes in memory (estimated size 3.7 KB, free
> 1048.0 MB)
> INFO  2018-08-23 12:47:38 Logging.scala:54 - 
> org.apache.spark.storage.BlockManager:
> Found block rdd_1692_7 locally
>
> I have tried with MEMORY_ONLY, MEMORY_AND_SER and so on with the same
> results. I have checked the IO disk (although if I use memory_only I guess
> that it doesn't have sense) and I can't see any problem. This happens
> randomly, but it could be in the 25% of the jobs.
>
> Any idea about what it could be happening?
>


Re: How to deal with context dependent computing?

2018-08-23 Thread Sonal Goyal
Hi Junfeng,

Can you please show by means of an example what you are trying to achieve?

Thanks,
Sonal
Nube Technologies 





On Thu, Aug 23, 2018 at 8:22 AM, JF Chen  wrote:

> For example, I have some data with timstamp marked as category A and B,
> and ordered by time. Now I want to calculate each duration from A to B. In
> normal program, I can use the  flag bit to record the preview data if it is
> A or B, and then calculate the duration. But in Spark Dataframe, how to do
> it?
>
> Thanks!
>
> Regard,
> Junfeng Chen
>


Re: Spark application complete it's job successfully on Yarn cluster but yarn register it as failed

2018-06-20 Thread Sonal Goyal
Have you checked the logs - they probably should have some more details.

On Wed 20 Jun, 2018, 2:51 PM Soheil Pourbafrani, 
wrote:

> Hi,
>
> I run a Spark application on Yarn cluster and it complete the process
> successfully, but at the end Yarn print in the console:
>
> client token: N/A
> diagnostics: Application application_1529485137783_0004 failed 4 times due
> to AM Container for appattempt_1529485137783_0004_04 exited with
> exitCode: 1
> Failing this attempt.Diagnostics: Exception from container-launch.
> Container id: container_e447_1529485137783_0004_04_01
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
> at org.apache.hadoop.util.Shell.run(Shell.java:869)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Container exited with a non-zero exit code 1
> For more detailed output, check the application tracking page:
> http://snamenode:8088/cluster/app/application_1529485137783_0004 Then
> click on links to logs of each attempt.
> . Failing the application.
> ApplicationMaster host: N/A
> ApplicationMaster RPC port: -1
> queue: streamQ
> start time: 1529485513957
> final status: FAILED
> tracking URL:
> http://snamenode:8088/cluster/app/application_1529485137783_0004
> user: manager
> Exception in thread "main" org.apache.spark.SparkException: Application
> application_1529485137783_0004 finished with failed status
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1104)
> at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1150)
> at org.apache.spark.deploy.yarn.Client.main(Client.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> and in Yarn UI the app status is registered failed. Waht's the problem?
>


Re: How can I do the following simple scenario in spark

2018-06-19 Thread Sonal Goyal
Try flatMapToPair instead of flatMap

Thanks,
Sonal
Nube Technologies 





On Tue, Jun 19, 2018 at 11:08 PM, Soheil Pourbafrani 
wrote:

> Hi, I have a JSON file in the following structure:
> ++---+
> |   full_text| id|
> ++---+
>
> I want to tokenize each sentence into pairs of (word, id)
>
> for example, having the record : ("Hi, How are you?", id) I want to
> convert the dataframe to:
> hi, id
> how, id
> are, id
> you, id
> ?, id
>
> So I try :
>
> data.rdd.map(lambda data : (data[0], data[1]))\
>.flatMap(lambda row: (word_tokenize(row[0].lower()), row[1])
>
> but it converts the dataframe to:
> [hi, how, are, you, ?]
>
> How can I do the desired transformation?
>


Re: Process large JSON file without causing OOM

2017-11-13 Thread Sonal Goyal
If you are running Spark with local[*] as master, there will be a single
process whose memory will be controlled by --driver-memory command line
option to spark submit. Check

http://spark.apache.org/docs/latest/configuration.html

spark.driver.memory 1g Amount of memory to use for the driver process, i.e.
where SparkContext is initialized. (e.g. 1g, 2g).
*Note:* In client mode, this config must not be set through the
SparkConf directly
in your application, because the driver JVM has already started at that
point. Instead, please set this through the --driver-memory command line
option or in your default properties file.

Thanks,
Sonal
Nube Technologies 





On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan  wrote:

> Hi Joel,
>
> Here are the relevant snippets of my code and an OOM error thrown
> in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
> even though I am running with -Xmx10G and 4G executor and driver memory as
> shown below.
>
> SparkConf sparkConf = new SparkConf()
> .setAppName("My Service")
> .setMaster("local[*]")
> .set("spark.ui.enabled", "true")
> .set("spark.executor.memory", "4G")
> .set("spark.driver.memory", "4G");
>
> sparkSessionBuilder = SparkSession.builder().config(
> sparkConf).enableHiveSupport();
>
> Dataset events = sparkSession.read()
> .format("json")
> .schema(inputConfig.getSchema())
> .load(inputFile.getPath());
>
> DataFrameWriter frameWriter = events.selectExpr(
> JavaConversions.asScalaBuffer(outputSchema.getColumns())) //
> select "data.customer AS `customer`", ...
> .write()
> .options(outputConfig.getProperties()) // compression=zlib
> .format("orc")
> 
> .partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions()))
> // partition by "customer"
> .save(outputUri.getPath());
>
>
> Here is the error log I get at runtime:
>
> 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
> 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
> java.lang.OutOfMemoryError: Java heap space
> Dumping heap to java_pid3790.hprof ...
> Heap dump file created [62653841 bytes in 2.212 secs]
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing "kill -9 3790"...
>
>
> And here is the thread from the thread dump that caused OOM:
>
> "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
> at java.lang.OutOfMemoryError.(OutOfMemoryError.java:48)
> at org.apache.hadoop.io.compress.BlockDecompressorStream.
> getCompressedData(BlockDecompressorStream.java:123)
> at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(
> BlockDecompressorStream.java:98)
> at org.apache.hadoop.io.compress.DecompressorStream.read(
> DecompressorStream.java:85)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>Local Variable: byte[]#3957
>Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
>Local Variable: org.apache.hadoop.io.Text#5
> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
> skipUtfByteOrderMark(LineRecordReader.java:144)
> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
> nextKeyValue(LineRecordReader.java:184)
>Local Variable: org.apache.hadoop.mapreduce.
> lib.input.LineRecordReader#1
> at org.apache.spark.sql.execution.datasources.
> RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>Local Variable: org.apache.spark.sql.execution.datasources.
> RecordReaderIterator#1
> at org.apache.spark.sql.execution.datasources.
> HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>Local Variable: org.apache.spark.sql.execution.datasources.
> HadoopFileLinesReader#1
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>Local Variable: scala.collection.Iterator$$anon$12#1
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:177)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
>Local Variable: org.apache.spark.sql.execution.datasources.
> FileScanRDD$$anon$1#1
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 

Re: Where can I get few GBs of sample data?

2017-09-28 Thread Sonal Goyal
Here are some links for public data sets

https://aws.amazon.com/public-datasets/
https://www.springboard.com/blog/free-public-data-sets-data-science-project/

Thanks,
Sonal
Nube Technologies 





On Thu, Sep 28, 2017 at 9:34 PM, Gaurav1809  wrote:

> Hi All,
>
> I have setup multi node spark cluster and now looking for good volume of
> data to test and see how it works while processing the same.
> Can anyone provide pointers as to where can i get few GBs of free sample
> data?
>
> Thanks and regards,
> Gaurav
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: More instances = slower Spark job

2017-09-28 Thread Sonal Goyal
Also check if the compression algorithm you use is splittable?

Thanks,
Sonal
Nube Technologies 





On Thu, Sep 28, 2017 at 2:17 PM, Tejeshwar J1 <
tejeshwar...@globallogic.com.invalid> wrote:

> Hi Miller,
>
>
>
> Try using
>
> 1.*coalesce(numberOfPartitions*) to reduce the number of partitions in
> order to avoid idle cores .
>
> 2.Try reducing executor memory as you increase the number of executors.
>
> 3. Try performing GC or changing naïve java serialization to *kryo*
> serialization.
>
>
>
>
>
> Thanks,
>
> Tejeshwar
>
>
>
>
>
> *From:* Jeroen Miller [mailto:bluedasya...@gmail.com]
> *Sent:* Thursday, September 28, 2017 2:11 PM
> *To:* user@spark.apache.org
> *Subject:* More instances = slower Spark job
>
>
>
> Hello,
>
>
>
> I am experiencing a disappointing performance issue with my Spark jobs
>
> as I scale up the number of instances.
>
>
>
> The task is trivial: I am loading large (compressed) text files from S3,
>
> filtering out lines that do not match a regex, counting the numbers
>
> of remaining lines and saving the resulting datasets as (compressed)
>
> text files on S3. Nothing that a simple grep couldn't do, except that
>
> the files are too large to be downloaded and processed locally.
>
>
>
> On a single instance, I can process X GBs per hour. When scaling up
>
> to 10 instances, I noticed that processing the /same/ amount of data
>
> actually takes /longer/.
>
>
>
> This is quite surprising as the task is really simple: I was expecting
>
> a significant speed-up. My naive idea was that each executors would
>
> process a fraction of the input file, count the remaining lines /locally/,
>
> and save their part of the processed file /independently/, thus no data
>
> shuffling would occur.
>
>
>
> Obviously, this is not what is happening.
>
>
>
> Can anyone shed some light on this or provide pointers to relevant
>
> information?
>
>
>
> Regards,
>
>
>
> Jeroen
>
>
>


Re: Efficient Spark-Submit planning

2017-09-12 Thread Sonal Goyal
Overall the defaults are sensible, but you definitely have to look at your
application and optimise a few of them. I mostly refer to the following
links when the job is slow or failing or we have more hardware which we see
we are not utilizing.

http://spark.apache.org/docs/latest/tuning.html
http://spark.apache.org/docs/latest/hardware-provisioning.html
http://spark.apache.org/docs/latest/configuration.html


Thanks,
Sonal
Nube Technologies 





On Tue, Sep 12, 2017 at 2:40 AM, Aakash Basu 
wrote:

> Hi,
>
> Can someone please clarify a little on how should we effectively calculate
> the parameters to be passed over using spark-submit.
>
> Parameters as in -
>
> Cores, NumExecutors, DriverMemory, etc.
>
> Is there any generic calculation which can be done over most kind of
> clusters with different sizes from small 3 node to 100s of nodes.
>
> Thanks,
> Aakash.
>


Re: Reading PDF/text/word file efficiently with Spark

2017-05-23 Thread Sonal Goyal
Hi,

Sorry it's not clear to me if you want help moving the data to the cluster
or in defining the best structure of your files on the cluster for
efficient processing. Are you on standalone or using hdfs?

On Tuesday, May 23, 2017, docdwarf  wrote:

> tesmai4 wrote
> > I am converting my Java based NLP parser to execute it on my Spark
> > cluster.  I know that Spark can read multiple text files from a directory
> > and convert into RDDs for further processing. My input data is not only
> in
> > text files, but in a multitude of different file formats.
> >
> > My question is: How can I efficiently read the input files
> > (PDF/Text/Word/HTML) in my Java based Spark program for processing these
> > files in Spark cluster.
>
> I will suggest  flume   . Flume is a
> distributed,
> reliable, and available service for efficiently collecting, aggregating,
> and
> moving large amounts of log data.
>
> I will also mention  kafka   . Kafka is a
> distributed streaming platform.
>
> It is also popular to use both flume and kafka together ( flafka
>  flume-meets-apache-kafka-for-event-processing/>
> ).
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Reading-PDF-text-word-file-efficiently-with-Spark-
> tp28699p28705.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>
>

-- 
Thanks,
Sonal
Nube Technologies 




Re: Adding worker dynamically in standalone mode

2017-05-15 Thread Sonal Goyal
If I remember correctly, just run the worker with master as current.

On Monday, May 15, 2017, Seemanto Barua  wrote:

> Hi
>
> Is it possible to add a worker dynamically to the master in standalone
> mode. If so can you please share the steps on how to ?
> Thanks
>


-- 
Thanks,
Sonal
Nube Technologies 




Re: Monitoring the User Metrics for a long running Spark Job

2016-12-07 Thread Sonal Goyal
You can try updating metrics.properties for the sink of your choice. In our
case, we add the following for getting application metrics in JSON format
using http

*.sink.reifier.class= org.apache.spark.metrics.sink.MetricsServlet

Here, we have defined the sink with name reifier and its class is the
MetricsServlet class. Then you can poll /metrics/applications/json

Take a look at https://github.com/hammerlab/spark-json-relay if it serves
your need.

Thanks,
Sonal
Nube Technologies 





On Wed, Dec 7, 2016 at 1:10 AM, Chawla,Sumit  wrote:

> Any pointers on this?
>
> Regards
> Sumit Chawla
>
>
> On Mon, Dec 5, 2016 at 8:30 PM, Chawla,Sumit 
> wrote:
>
>> An example implementation i found is : https://github.com/groupon/s
>> park-metrics
>>
>> Anyone has any experience using this?  I am more interested in something
>> for Pyspark specifically.
>>
>> The above link pointed to - https://github.com/apache/sp
>> ark/blob/master/conf/metrics.properties.template.  I need to spend some
>> time reading it, but any quick pointers will be appreciated.
>>
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Mon, Dec 5, 2016 at 8:17 PM, Chawla,Sumit 
>> wrote:
>>
>>> Hi Manish
>>>
>>> I am specifically looking for something similar to following:
>>>
>>>  https://ci.apache.org/projects/flink/flink-docs-release-1.1
>>> /apis/common/index.html#accumulators--counters.
>>>
>>> Flink has this concept of Accumulators, where user can keep its custom
>>> counters etc.  While the application is executing these counters are
>>> queryable through REST API provided by Flink Monitoring Backend.  This way
>>> you don't have to wait for the program to complete.
>>>
>>>
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>> On Mon, Dec 5, 2016 at 5:53 PM, manish ranjan 
>>> wrote:
>>>
 http://spark.apache.org/docs/latest/monitoring.html

 You can even install tools like  dstat
 , iostat
 , and iotop
 , *collectd*  can provide
 fine-grained profiling on individual nodes.

 If you are using Mesos as Resource Manager , mesos exposes metrics as
 well for the running job.

 Manish

 ~Manish



 On Mon, Dec 5, 2016 at 4:17 PM, Chawla,Sumit 
 wrote:

> Hi All
>
> I have a long running job which takes hours and hours to process
> data.  How can i monitor the operational efficency of this job?  I am
> interested in something like Storm\Flink style User metrics/aggregators,
> which i can monitor while my job is running.  Using these metrics i want 
> to
> monitor, per partition performance in processing items.  As of now, only
> way for me to get these metrics is when the job finishes.
>
> One possibility is that spark can flush the metrics to external system
> every few seconds, and thus use  an external system to monitor these
> metrics.  However, i wanted to see if the spark supports any such use case
> OOB.
>
>
> Regards
> Sumit Chawla
>
>

>>>
>>
>


Re: javac - No such file or directory

2016-11-09 Thread Sonal Goyal
It looks to be an issue with the java compiler, is the jdk setup correctly?
Please check your java installation.

Thanks,
Sonal
Nube Technologies 





On Wed, Nov 9, 2016 at 7:13 PM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> I'm getting this error trying to build spark on Centos7. It is not
> googling very well:
>
> [error] (tags/compile:compileIncremental) java.io.IOException: Cannot run
> program 
> "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.111-1.b15.el7_2.x86_64/bin/javac"
> (in directory "/home/spark/spark"): error=2, No such file or directory
> Any ideas?
>
> Thanks,
>
> Andrew
>
>
>
> Full output:
>
> [success] created output: /home/spark/spark/external/kafka-0-10/target
>
> [info] Compiling 2 Scala sources and 8 Java sources to
> /home/spark/spark/common/tags/target/scala-2.11/classes...
>
> java.io.IOException: Cannot run program "/usr/lib/jvm/java-1.8.0-
> openjdk-1.8.0.111-1.b15.el7_2.x86_64/bin/javac" (in directory
> "/home/spark/spark"): error=2, No such file or directory
>
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>
> at sbt.SimpleProcessBuilder.run(ProcessImpl.scala:349)
>
> at sbt.AbstractProcessBuilder.run(ProcessImpl.scala:128)
>
> at sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(
> ProcessImpl.scala:159)
>
> at sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(
> ProcessImpl.scala:159)
>
> at sbt.compiler.javac.JavacLogger.buffer(JavacProcessLogger.scala:31)
>
> at sbt.AbstractProcessBuilder.runBuffered(ProcessImpl.scala:159)
>
> at sbt.AbstractProcessBuilder.$bang(ProcessImpl.scala:156)
>
> at sbt.compiler.javac.ForkedJava$$anonfun$launch$1.apply(
> ForkedJava.scala:24)
>
> at sbt.compiler.javac.ForkedJava$$anonfun$launch$1.apply(
> ForkedJava.scala:17)
>
> at sbt.compiler.javac.ForkedJava$$anonfun$withArgumentFile$1.
> apply(ForkedJava.scala:47)
>
> at sbt.compiler.javac.ForkedJava$$anonfun$withArgumentFile$1.
> apply(ForkedJava.scala:44)
>
> at sbt.IO$.withTemporaryDirectory(IO.scala:344)
>
> at sbt.compiler.javac.ForkedJava$.withArgumentFile(ForkedJava.scala:44)
>
> at sbt.compiler.javac.ForkedJava$.launch(ForkedJava.scala:17)
>
> at sbt.compiler.javac.ForkedJavaCompiler.run(ForkedJava.scala:68)
>
> at sbt.compiler.javac.JavaCompilerAdapter.compileWithReporter(
> JavaCompilerAdapter.scala:31)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply$mcV$
> sp(AnalyzingJavaCompiler.scala:65)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply(
> AnalyzingJavaCompiler.scala:65)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply(
> AnalyzingJavaCompiler.scala:65)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler.timed(
> AnalyzingJavaCompiler.scala:93)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler.compile(
> AnalyzingJavaCompiler.scala:64)
>
> at sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply$
> mcV$sp(MixedAnalyzingCompiler.scala:60)
>
> at sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply(
> MixedAnalyzingCompiler.scala:60)
>
> at sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply(
> MixedAnalyzingCompiler.scala:60)
>
> at sbt.compiler.MixedAnalyzingCompiler.timed(MixedAnalyzingCompiler.scala:
> 74)
>
> at sbt.compiler.MixedAnalyzingCompiler.compileJava$1(
> MixedAnalyzingCompiler.scala:59)
>
> at sbt.compiler.MixedAnalyzingCompiler.compile(
> MixedAnalyzingCompiler.scala:64)
>
> at sbt.compiler.IC$$anonfun$compileInternal$1.apply(
> IncrementalCompiler.scala:160)
>
> at sbt.compiler.IC$$anonfun$compileInternal$1.apply(
> IncrementalCompiler.scala:160)
>
> at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:66)
>
> at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:64)
>
> at sbt.inc.IncrementalCommon.cycle(IncrementalCommon.scala:32)
>
> at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:68)
>
> at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:67)
>
> at sbt.inc.Incremental$.manageClassfiles(Incremental.scala:95)
>
> at sbt.inc.Incremental$.compile(Incremental.scala:67)
>
> at sbt.inc.IncrementalCompile$.apply(Compile.scala:54)
>
> at sbt.compiler.IC$.compileInternal(IncrementalCompiler.scala:160)
>
> at sbt.compiler.IC$.incrementalCompile(IncrementalCompiler.scala:138)
>
> at sbt.Compiler$.compile(Compiler.scala:152)
>
> at sbt.Compiler$.compile(Compiler.scala:138)
>
> at sbt.Defaults$.sbt$Defaults$$compileIncrementalTaskImpl(
> Defaults.scala:860)
>
> at sbt.Defaults$$anonfun$compileIncrementalTask$1.
> apply(Defaults.scala:851)
>
> at sbt.Defaults$$anonfun$compileIncrementalTask$1.
> apply(Defaults.scala:849)
>
> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>
> at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
>
> at sbt.std.Transform$$anon$4.work(System.scala:63)
>
> at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
>
> at 

Re: Open source Spark based projects

2016-09-22 Thread Sonal Goyal
https://spark-packages.org/



Thanks,
Sonal
Nube Technologies 





On Thu, Sep 22, 2016 at 3:48 PM, Sean Owen  wrote:

> https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
> and maybe related ...
> https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark
>
> On Thu, Sep 22, 2016 at 11:15 AM, tahirhn  wrote:
> > I am planning to write a thesis on certain aspects (i.e testing,
> performance
> > optimisation, security) of Apache Spark. I need to study some projects
> that
> > are based on Apache Spark and are available as open source.
> >
> > If you know any such project (open source Spark based project), Please
> share
> > it here. Thanks
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Open-source-Spark-based-projects-tp27778.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How does wholeTextFiles() work in Spark-Hadoop Cluster?

2016-09-08 Thread Sonal Goyal
Are you looking at the worker logs or the driver?

On Thursday, September 8, 2016, Nisha Menon  wrote:

> I have an RDD created as follows:
>
> *JavaPairRDD inputDataFiles =
> sparkContext.wholeTextFiles("hdfs://ip:8020/user/cdhuser/inputFolder/");*
>
> On this RDD I perform a map to process individual files and invoke a
> foreach to trigger the same map.
>
>* JavaRDD output = inputDataFiles.map(new
> Function,Object[]>()*
> *{*
>
> *private static final long serialVersionUID = 1L;*
>
> * @Override*
> * public Object[] call(Tuple2 v1) throws Exception *
> *{ *
> *  System.out.println("in map!");*
> *   //do something with v1. *
> *  return Object[]*
> *} *
> *});*
>
> *output.foreach(new VoidFunction() {*
>
> * private static final long serialVersionUID = 1L;*
>
> * @Override*
> * public void call(Object[] t) throws Exception {*
> * //do nothing!*
> * System.out.println("in foreach!");*
> * }*
> * }); *
>
> This code works perfectly fine for standalone setup on my local laptop
> while accessing both local files as well as remote HDFS files.
>
> In cluster the same code produces no results. My intuition is that the
> data has not reached the individual executors and hence both the `map` and
> `foreach` does not work. It might be a guess. But I am not able to figure
> out why this would not work in cluster. I dont even see the print
> statements in `map` and `foreach` getting printed in cluster mode of
> execution.
>
> I notice a particular line in standalone output that I do NOT see in
> cluster execution.
>
> *16/09/07 17:35:35 INFO WholeTextFileRDD: Input split:
> Paths:/user/cdhuser/inputFolder/data1.txt:0+657345,/user/cdhuser/inputFolder/data10.txt:0+657345,/user/cdhuser/inputFolder/data2.txt:0+657345,/user/cdhuser/inputFolder/data3.txt:0+657345,/user/cdhuser/inputFolder/data4.txt:0+657345,/user/cdhuser/inputFolder/data5.txt:0+657345,/user/cdhuser/inputFolder/data6.txt:0+657345,/user/cdhuser/inputFolder/data7.txt:0+657345,/user/cdhuser/inputFolder/data8.txt:0+657345,/user/cdhuser/inputFolder/data9.txt:0+657345*
>
> I had a similar code with textFile() that worked earlier for individual
> files on cluster. The issue is with wholeTextFiles() only.
>
> Please advise what is the best way to get this working or other alternate
> ways.
>
> My setup is cloudera 5.7 distribution with Spark Service. I used the
> master as `yarn-client`.
>
> The action can be anything. Its just a dummy step to invoke the map. I
> also tried *System.out.println("Count is:"+output.count());*, for which I
> got the correct answer of `10`, since there were 10 files in the folder,
> but still the map refuses to work.
>
> Thanks.
>
>

-- 
Thanks,
Sonal
Nube Technologies 




Re: [Spark submit] getting error when use properties file parameter in spark submit

2016-09-06 Thread Sonal Goyal
Looks like a classpath issue - Caused by: java.lang.ClassNotFoundException:
com.amazonaws.services.s3.AmazonS3

Are you using S3 somewhere? Are the required jars in place?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Tue, Sep 6, 2016 at 4:45 PM, Divya Gehlot 
wrote:

> Hi,
> I am getting below error if I try to use properties file paramater in
> spark-submit
>
> Exception in thread "main" java.util.ServiceConfigurationError:
> org.apache.hadoop.fs.FileSystem: Provider 
> org.apache.hadoop.fs.s3a.S3AFileSystem
> could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2673)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(
> FileSystem.java:2684)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2701)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2737)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2719)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:375)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
> at org.apache.spark.deploy.yarn.ApplicationMaster.run(
> ApplicationMaster.scala:142)
> at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$
> main$1.apply$mcV$sp(ApplicationMaster.scala:653)
> at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(
> SparkHadoopUtil.scala:69)
> at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(
> SparkHadoopUtil.scala:68)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
> at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(
> SparkHadoopUtil.scala:68)
> at org.apache.spark.deploy.yarn.ApplicationMaster$.main(
> ApplicationMaster.scala:651)
> at org.apache.spark.deploy.yarn.ApplicationMaster.main(
> ApplicationMaster.scala)
> Caused by: java.lang.NoClassDefFoundError: com/amazonaws/services/s3/
> AmazonS3
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
> at java.lang.Class.getConstructor0(Class.java:2895)
> at java.lang.Class.newInstance(Class.java:354)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 19 more
> Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.
> AmazonS3
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 24 more
> End of LogType:stderr
>
> If I remove the --properties-file parameter
> the error is gone
>
> Would really appreciate the help .
>
>
>
> Thanks,
> Divya
>


Re: Avoid Cartesian product in calculating a distance matrix?

2016-08-06 Thread Sonal Goyal
The general approach to the Cartesian problem is to first block or index
your rows so that similar items fall in the same bucket, and then join
within each bucket. Is that possible in your case?

On Friday, August 5, 2016, Paschalis Veskos  wrote:

> Hello everyone,
>
> I am interested in running an application on Spark that at some point
> needs to compare all elements of an RDD against all others to create a
> distance matrix. The RDD is of type  and the Pearson
> correlation is applied to each element against all others, generating
> a matrix with the distance between all possible combinations of
> elements.
>
> I have implemented this by taking the cartesian product of the RDD
> with itself, filtering half the matrix away since it is symmetric,
> then doing a combineByKey to get all other elements that it needs to
> be compared with. I map the output of this over the comparison
> function implementing the Pearson correlation.
>
> You can probably guess this is dead slow. I use Spark 1.6.2, the code
> is written in Java 8. At the rate it is processing in a cluster with 4
> nodes with 16cores and 56gb ram each, for a list with ~15000 elements
> split in 512 partitions, the cartesian operation alone is estimated to
> take about 3000 hours (all cores are maxed out on all machines)!
>
> Is there a way to avoid the cartesian product to calculate what I
> want? Would a DataFrame join be faster? Or is this an operation that
> just requires a much larger cluster?
>
> Thank you,
>
> Paschalis
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>
>

-- 
Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015





Re: Generating unique id for a column in Row without breaking into RDD and joining back

2016-08-05 Thread Sonal Goyal
Hi Tony,

Would hash on the bid work for you?

hash(cols: Column

*): Column
[image:
Permalink]


Calculates the hash code of given columns, and returns the result as an int
column.
Annotations@varargs()Since

2.0

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Fri, Aug 5, 2016 at 7:41 PM, Tony Lane  wrote:

> Ayan - basically i have a dataset with structure, where bid are unique
> string values
>
> bid: String
> val : integer
>
> I need unique int values for these string bid''s to do some processing in
> the dataset
>
> like
>
> id:int   (unique integer id for each bid)
> bid:String
> val:integer
>
>
>
> -Tony
>
> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha  wrote:
>
>> Hi
>>
>> Can you explain a little further?
>>
>> best
>> Ayan
>>
>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane 
>> wrote:
>>
>>> I have a row with structure like
>>>
>>> identifier: String
>>> value: int
>>>
>>> All identifier are unique and I want to generate a unique long id for
>>> the data and get a row object back for further processing.
>>>
>>> I understand using the zipWithUniqueId function on RDD, but that would
>>> mean first converting to RDD and then joining back the RDD and dataset
>>>
>>> What is the best way to do this ?
>>>
>>> -Tony
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Extracting key word from a textual column

2016-08-02 Thread Sonal Goyal
Hi Mich,

It seems like an entity resolution problem - looking at different
representations of an entity - SAINSBURY in this case and matching them all
together. How dirty is your data in the description - are there stop words
like SACAT/SMKT etc you can strip off and get the base retailer entity ?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Tue, Aug 2, 2016 at 9:55 PM, Mich Talebzadeh 
wrote:

> Thanks.
>
> I believe there is some catalog of companies that I can get and store it
> in a table and math the company name to transactiondesciption column.
>
> That catalog should have sectors in it. For example company XYZ is under
> Grocers etc which will make search and grouping much easier.
>
> I believe Spark can do it, though I am generally interested on alternative
> ideas.
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 2 August 2016 at 16:26, Yong Zhang  wrote:
>
>> Well, if you still want to use windows function for your logic, then you
>> need to derive a new column out, like "catalog", and use it as part of
>> grouping logic.
>>
>>
>> Maybe you can use regex for deriving out this new column. The
>> implementation needs to depend on your data in "transactiondescription",
>> and regex gives you the most powerful way to handle your data.
>>
>>
>> This is really not a Spark question, but how to you process your logic
>> based on the data given.
>>
>>
>> Yong
>>
>>
>> --
>> *From:* Mich Talebzadeh 
>> *Sent:* Tuesday, August 2, 2016 10:00 AM
>> *To:* user @spark
>> *Subject:* Extracting key word from a textual column
>>
>> Hi,
>>
>> Need some ideas.
>>
>> *Summary:*
>>
>> I am working on a tool to slice and dice the amount of money I have spent
>> so far (meaning the whole data sample) on a given retailer so I have a
>> better idea of where I am wasting the money
>>
>> *Approach*
>>
>> Downloaded my bank statements from a given account in csv format from
>> inception till end of July. Read the data and stored it in ORC table.
>>
>> I am interested for all bills that I paid using Debit Card (
>> transactiontype = "DEB") that comes out the account directly.
>> Transactiontype is the three character code lookup that I download as well.
>>
>> scala> ll_18740868.printSchema
>> root
>>  |-- transactiondate: date (nullable = true)
>>  |-- transactiontype: string (nullable = true)
>>  |-- sortcode: string (nullable = true)
>>  |-- accountnumber: string (nullable = true)
>>  |-- transactiondescription: string (nullable = true)
>>  |-- debitamount: double (nullable = true)
>>  |-- creditamount: double (nullable = true)
>>  |-- balance: double (nullable = true)
>>
>> The important fields are transactiondate, transactiontype,
>> transactiondescription and debitamount
>>
>> So using analytics. windowing I can do all sorts of things. For example
>> this one gives me the last time I spent money on retailer XYZ and the amount
>>
>> SELECT *
>> FROM (
>>   select transactiondate, transactiondescription, debitamount
>>   , rank() over (order by transactiondate desc) AS rank
>>   from accounts.ll_18740868 where transactiondescription like '%XYZ%'
>>  ) tmp
>> where rank <= 1
>>
>> And its equivalent using Windowing in FP
>>
>> import org.apache.spark.sql.expressions.Window
>> val wSpec =
>> Window.partitionBy("transactiontype").orderBy(desc("transactiondate"))
>> ll_18740868.filter(col("transactiondescription").contains("XYZ")).select($"transactiondate",$"transactiondescription",
>> rank().over(wSpec).as("rank")).filter($"rank"===1).show
>>
>>
>> +---+--++
>> |transactiondate|transactiondescription|rank|
>> +---+--++
>> | 2015-12-15|  XYZ LTD CD 4636 |   1|
>> +---+--++
>>
>> So far so good. But if I want to find all I spent on each retailer, then
>> it gets trickier as a retailer appears like below in the column
>> transactiondescription:
>>
>>
>> ll_18740868.where($"transactiondescription".contains("SAINSBURY")).select($"transactiondescription").show(5)
>> 

Re: Tuning level of Parallelism: Increase or decrease?

2016-08-02 Thread Sonal Goyal
Hi Jestin,

Which of your actions is the bottleneck? Is it group by, count or the join?
Or all of them? It may help to tune the most time consuming ask first.

On Monday, August 1, 2016, Nikolay Zhebet  wrote:

> Yes, Spark always trying to deliver snippet of code to the data (not vice
> versa). But you should realize, that if you try to run groupBY or Join on
> the large dataset, then you always should migrate temporary localy grouped
> data from one worker node to the another(It is shuffle operation as i
> know). In the end of all batch proceses, you can fetch your grouped
> dataset. But in underhood you can see alot of network connection between
> worker-nodes, because all your 2TB data was splitted on 128MB parts and was
> writed on the different HDFSDataNodes.
>
> As example: You analyze your workflow and realized, that in most cases,
> you  grouped your data by date(-mm-dd). In this case you can save data
> from all day in one Region Server(if you use Spark-on-HBase DataFrame). In
> this case your "group By date" operation can be done on the local
> worker-node and without shuffling your temporary data between other
> workers-nodes. Maybe this article can be usefull:
> http://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/
>
> 2016-08-01 18:56 GMT+03:00 Jestin Ma  >:
>
>> Hi Nikolay, I'm looking at data locality improvements for Spark, and I
>> have conflicting sources on using YARN for Spark.
>>
>> Reynold said that Spark workers automatically take care of data locality
>> here:
>> https://www.quora.com/Does-Apache-Spark-take-care-of-data-locality-when-Spark-workers-load-data-from-HDFS
>>
>> However, I've read elsewhere (
>> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/yarn/)
>> that Spark on YARN increases data locality because YARN tries to place
>> tasks next to HDFS blocks.
>>
>> Can anyone verify/support one side or the other?
>>
>> Thank you,
>> Jestin
>>
>> On Mon, Aug 1, 2016 at 1:15 AM, Nikolay Zhebet > > wrote:
>>
>>> Hi.
>>> Maybe you can help "data locality"..
>>> If you use groupBY and joins, than most likely you will see alot of
>>> network operations. This can be werry slow. You can try prepare, transform
>>> your information in that way, what can minimize transporting temporary
>>> information between worker-nodes.
>>>
>>> Try google in this way "Data locality in Hadoop"
>>>
>>>
>>> 2016-08-01 4:41 GMT+03:00 Jestin Ma >> >:
>>>
 It seems that the number of tasks being this large do not matter. Each
 task was set default by the HDFS as 128 MB (block size) which I've heard to
 be ok. I've tried tuning the block (task) size to be larger and smaller to
 no avail.

 I tried coalescing to 50 but that introduced large data skew and slowed
 down my job a lot.

 On Sun, Jul 31, 2016 at 5:27 PM, Andrew Ehrlich > wrote:

> 15000 seems like a lot of tasks for that size. Test it out with a
> .coalesce(50) placed right after loading the data. It will probably either
> run faster or crash with out of memory errors.
>
> On Jul 29, 2016, at 9:02 AM, Jestin Ma  > wrote:
>
> I am processing ~2 TB of hdfs data using DataFrames. The size of a
> task is equal to the block size specified by hdfs, which happens to be 128
> MB, leading to about 15000 tasks.
>
> I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.
> I'm performing groupBy, count, and an outer-join with another
> DataFrame of ~200 MB size (~80 MB cached but I don't need to cache it),
> then saving to disk.
>
> Right now it takes about 55 minutes, and I've been trying to tune it.
>
> I read on the Spark Tuning guide that:
> *In general, we recommend 2-3 tasks per CPU core in your cluster.*
>
> This means that I should have about 30-50 tasks instead of 15000, and
> each task would be much bigger in size. Is my understanding correct, and 
> is
> this suggested? I've read from difference sources to decrease or increase
> parallelism, or even keep it default.
>
> Thank you for your help,
> Jestin
>
>
>

>>>
>>
>

-- 
Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015





Re: What are using Spark for

2016-08-02 Thread Sonal Goyal
Hi Rohit,

You can check the powered by spark page for some real usage of Spark.

https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark

On Tuesday, August 2, 2016, Rohit L  wrote:

> Hi Everyone,
>
>   I want to know the real world uses cases for which Spark is used and
> hence can you please share for what purpose you are using Apache Spark in
> your project?
>
> --
> Rohit
>


-- 
Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015





Re: Any reference of performance tuning on SparkSQL?

2016-07-28 Thread Sonal Goyal
I found some references at

http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-tuning-in-Spark-SQL-td21871.html

HTH

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Thu, Jul 28, 2016 at 12:40 PM, Linyuxin  wrote:

> Hi ALL
>
>  Is there any  reference of performance tuning on SparkSQL?
>
> I can only find about turning on spark core on http://spark.apache.org/
>


Re: Possible to broadcast a function?

2016-06-29 Thread Sonal Goyal
Have you looked at Alluxio? (earlier tachyon)

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Wed, Jun 29, 2016 at 7:30 PM, Aaron Perrin  wrote:

> The user guide describes a broadcast as a way to move a large dataset to
> each node:
>
> "Broadcast variables allow the programmer to keep a read-only variable
> cached on each machine rather than shipping a copy of it with tasks. They
> can be used, for example, to give every node a copy of a large input
> dataset in an efficient manner."
>
> And the broadcast example shows it being used with a variable.
>
> But, is it somehow possible to instead broadcast a function that can be
> executed once, per node?
>
> My use case is the following:
>
> I have a large data structure that I currently create on each executor.
> The way that I create it is a hack.  That is, when the RDD function is
> executed on the executor, I block, load a bunch of data (~250 GiB) from an
> external data source, create the data structure as a static object in the
> JVM, and then resume execution.  This works, but it ends up costing me a
> lot of extra memory (i.e. a few TiB when I have a lot of executors).
>
> What I'd like to do is use the broadcast mechanism to load the data
> structure once, per node.  But, I can't serialize the data structure from
> the driver.
>
> Any ideas?
>
> Thanks!
>
> Aaron
>
>


Re: OOM on the driver after increasing partitions

2016-06-22 Thread Sonal Goyal
What does your application do?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Hello All,
>
> We have a Spark cluster where driver and master are running on the same
> node. We are using Spark Standalone cluster manager. If the number of nodes
> (and the partitions) are increased, the same dataset that used to run to
> completion on lesser number of nodes is now giving an out of memory on the
> driver.
>
> For example, a dataset that runs on 32 nodes with number of partitions set
> to 256 completes whereas the same dataset when run on 64 nodes with number
> of partitions as 512 gives an OOM on the driver side.
>
> From what I read in the Spark documentation and other articles, following
> are the responsibilities of the driver/master.
>
> 1) create spark context
> 2) build DAG of operations
> 3) schedule tasks
>
> I am guessing that 1) and 2) should not change w.r.t number of
> nodes/partitions. So is it that since the driver has to keep track of lot
> more tasks, that it gives an OOM?
>
> What could be the possible reasons behind the driver-side OOM when the
> number of partitions are increased?
>
> Regards,
> Raghava.
>


Re: GraphX Java API

2016-05-31 Thread Sonal Goyal
Its very much possible to use GraphX through Java, though some boilerplate
may be needed. Here is an example.

Create a graph from edge and vertex RDD (JavaRDD>
vertices, JavaRDD edges )


ClassTag longTag = scala.reflect.ClassTag$.MODULE$.apply(Long.class);
Graph graph = Graph.apply(vertices.rdd(),
edges.rdd(), 0L, StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),
longTag, longTag);



Then basically you can call graph.ops() and do available operations like
triangleCounting etc,

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Tue, May 31, 2016 at 11:40 AM, Santoshakhilesh <
santosh.akhil...@huawei.com> wrote:

> Hi ,
>
> Scala has similar package structure as java and finally it runs on JVM so
> probably you get an impression that its in Java.
>
> As far as I know there are no Java API for GraphX. I had used GraphX last
> year and at that time I had to code in Scala to use the GraphX APIs.
>
> Regards,
> Santosh Akhilesh
>
>
>
>
>
> *From:* Kumar, Abhishek (US - Bengaluru) [mailto:
> abhishekkuma...@deloitte.com]
> *Sent:* 30 May 2016 13:24
> *To:* Santoshakhilesh; user@spark.apache.org
> *Cc:* Golatkar, Jayesh (US - Bengaluru); Soni, Akhil Dharamprakash (US -
> Bengaluru); Matta, Rishul (US - Bengaluru); Aich, Risha (US - Bengaluru);
> Kumar, Rajinish (US - Bengaluru); Jain, Isha (US - Bengaluru); Kumar,
> Sandeep (US - Bengaluru)
> *Subject:* RE: GraphX Java API
>
>
>
> Hey,
>
> ·   I see some graphx packages listed here:
>
> http://spark.apache.org/docs/latest/api/java/index.html
>
> ·   org.apache.spark.graphx
> 
>
> ·   org.apache.spark.graphx.impl
> 
>
> ·   org.apache.spark.graphx.lib
> 
>
> ·   org.apache.spark.graphx.util
> 
>
> Aren’t they meant to be used with JAVA?
>
> Thanks
>
>
>
> *From:* Santoshakhilesh [mailto:santosh.akhil...@huawei.com
> ]
> *Sent:* Friday, May 27, 2016 4:52 PM
> *To:* Kumar, Abhishek (US - Bengaluru) ;
> user@spark.apache.org
> *Subject:* RE: GraphX Java API
>
>
>
> GraphX APis are available only in Scala. If you need to use GraphX you
> need to switch to Scala.
>
>
>
> *From:* Kumar, Abhishek (US - Bengaluru) [
> mailto:abhishekkuma...@deloitte.com ]
> *Sent:* 27 May 2016 19:59
> *To:* user@spark.apache.org
> *Subject:* GraphX Java API
>
>
>
> Hi,
>
>
>
> We are trying to consume the Java API for GraphX, but there is no
> documentation available online on the usage or examples. It would be great
> if we could get some examples in Java.
>
>
>
> Thanks and regards,
>
>
>
> *Abhishek Kumar*
>
>
>
>
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> v.E.1
>
>
>
>
>
>
>
>
>


Re: Error while saving plots

2016-05-26 Thread Sonal Goyal
Does the path /home/njoshi/dev/outputs/test_/plots/  exist on the driver ?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Wed, May 25, 2016 at 2:07 AM, njoshi  wrote:

> For an analysis app, I have to make ROC curves on the fly and save to
> disc. I
> am using scala-chart for this purpose and doing the following in my Spark
> app:
>
>
> val rocs = performances.map{case (id, (auRoc, roc)) => (id,
> roc.collect().toList)}
> XYLineChart(rocs.toSeq, title = "Pooled Data Performance:
> AuROC").saveAsPNG(outputpath + "/plots/global.png")
>
>
> However, I am getting the following exception. Does anyone have idea of the
> cause?
>
>
> Exception in thread "main" java.io.FileNotFoundException:
> file:/home/njoshi/dev/outputs/test_/plots/global.png (No such file or
> directory)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at java.io.FileOutputStream.(FileOutputStream.java:101)
> at
>
> scalax.chart.exporting.PNGExporter$.saveAsPNG$extension(exporting.scala:138)
> at
>
> com.aol.advertising.ml.globaldata.EvaluatorDriver$.main(EvaluatorDriver.scala:313)
> at
>
> com.aol.advertising.ml.globaldata.EvaluatorDriver.main(EvaluatorDriver.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> Thanks in advance,
>
> Nikhil
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-saving-plots-tp27016.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Cartesian join on RDDs taking too much time

2016-05-25 Thread Sonal Goyal
You can look at ways to group records from both rdds together instead of
doing Cartesian.  Say generate pair rdd from each with first letter as key.
Then do a partition and a join.
On May 25, 2016 8:04 PM, "Priya Ch"  wrote:

> Hi,
>   RDD A is of size 30MB and RDD B is of size 8 MB. Upon matching, we would
> like to filter out the strings that have greater than 85% match and
> generate a score for it which is used in the susbsequent calculations.
>
> I tried generating pair rdd from both rdds A and B with same key for all
> the records. Now performing A.join(B) is also resulting in huge execution
> time..
>
> How do I go about with map and reduce here ? To generate pairs from 2 rdds
> I dont think map can be used because we cannot have rdd inside another rdd.
>
> Would be glad if you can throw me some light on this.
>
> Thanks,
> Padma Ch
>
> On Wed, May 25, 2016 at 7:39 PM, Jörn Franke  wrote:
>
>> Solr or Elastic search provide much more functionality and are faster in
>> this context. The decision for or against them depends on your current and
>> future use cases. Your current use case is still very abstract so in order
>> to get a more proper recommendation you need to provide more details
>> including size of dataset, what you do with the result of the matching do
>> you just need the match number or also the pairs in the results etc.
>>
>> Your concrete problem can also be solved in Spark (though it is not the
>> best and most efficient tool for this, but it has other strength) using the
>> map reduce steps. There are different ways to implement this (Generate
>> pairs from the input datasets in the map step or (maybe less recommendable)
>> broadcast the smaller dataset to all nodes and do the matching with the
>> bigger dataset there.
>> This highly depends on the data in your data set. How they compare in
>> size etc.
>>
>>
>>
>> On 25 May 2016, at 13:27, Priya Ch  wrote:
>>
>> Why do i need to deploy solr for text anaytics...i have files placed in
>> HDFS. just need to look for matches against each string in both files and
>> generate those records whose match is > 85%. We trying to Fuzzy match
>> logic.
>>
>> How can use map/reduce operations across 2 rdds ?
>>
>> Thanks,
>> Padma Ch
>>
>> On Wed, May 25, 2016 at 4:49 PM, Jörn Franke 
>> wrote:
>>
>>>
>>> Alternatively depending on the exact use case you may employ solr on
>>> Hadoop for text analytics
>>>
>>> > On 25 May 2016, at 12:57, Priya Ch 
>>> wrote:
>>> >
>>> > Lets say i have rdd A of strings as  {"hi","bye","ch"} and another RDD
>>> B of
>>> > strings as {"padma","hihi","chch","priya"}. For every string rdd A i
>>> need
>>> > to check the matches found in rdd B as such for string "hi" i have to
>>> check
>>> > the matches against all strings in RDD B which means I need generate
>>> every
>>> > possible combination r
>>>
>>
>>
>


Re: Spark for offline log processing/querying

2016-05-22 Thread Sonal Goyal
Hi Mat,

I think you could also use spark SQL to query the logs. Hope the following
link helps

https://databricks.com/blog/2014/09/23/databricks-reference-applications.html
On May 23, 2016 10:59 AM, "Mat Schaffer"  wrote:

> I'm curious about trying to use spark as a cheap/slow ELK
> (ElasticSearch,Logstash,Kibana) system. Thinking something like:
>
> - instances rotate local logs
> - copy rotated logs to s3
> (s3://logs/region/grouping/instance/service/*.logs)
> - spark to convert from raw text logs to parquet
> - maybe presto to query the parquet?
>
> I'm still new on Spark though, so thought I'd ask if anyone was familiar
> with this sort of thing and if there are maybe some articles or documents I
> should be looking at in order to learn how to build such a thing. Or if
> such a thing even made sense.
>
> Thanks in advance, and apologies if this has already been asked and I
> missed it!
>
> -Mat
>
> matschaffer.com
>


Re: strange HashPartitioner behavior in Spark

2016-04-18 Thread Sonal Goyal
Are you specifying your spark master in the scala program?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Mon, Apr 18, 2016 at 6:26 PM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Mike,
>
> We tried that. This map task is actually part of a larger set of
> operations. I pointed out this map task since it involves partitionBy() and
> we always use partitionBy() whenever partition-unaware shuffle operations
> are performed (such as distinct). We in fact do not notice a change in the
> distribution after several unrelated stages are executed and a significant
> time has passed (nearly 10-15 minutes).
>
> I agree. We are not looking for partitions to go to specific nodes and nor
> do we expect a uniform distribution of keys across the cluster. There will
> be a skew. But it cannot be that all the data is on one node and nothing on
> the other and no, the keys are not the same. They vary from 1 to around
> 55000 (integers). What makes this strange is that it seems to work fine on
> the spark shell (REPL).
>
> Regards,
> Raghava.
>
>
> On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> A HashPartitioner will indeed partition based on the key, but you
>> cannot know on *which* node that key will appear. Again, the RDD
>> partitions will not necessarily be distributed evenly across your
>> nodes because of the greedy scheduling of the first wave of tasks,
>> particularly if those tasks have durations less than the initial
>> executor delay. I recommend you look at your logs to verify if this is
>> happening to you.
>>
>> Mike
>>
>> On 4/18/16, Anuj Kumar  wrote:
>> > Good point Mike +1
>> >
>> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>> >
>> >> When submitting a job with spark-submit, I've observed delays (up to
>> >> 1--2 seconds) for the executors to respond to the driver in order to
>> >> receive tasks in the first stage. The delay does not persist once the
>> >> executors have been synchronized.
>> >>
>> >> When the tasks are very short, as may be your case (relatively small
>> >> data and a simple map task like you have described), the 8 tasks in
>> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> >> the second executor won't have responded to the master before the
>> >> first 4 tasks on the first executor have completed.
>> >>
>> >> To see if this is the cause in your particular case, you could try the
>> >> following to confirm:
>> >> 1. Examine the starting times of the tasks alongside their
>> >> executor
>> >> 2. Make a "dummy" stage execute before your real stages to
>> >> synchronize the executors by creating and materializing any random RDD
>> >> 3. Make the tasks longer, i.e. with some silly computational
>> >> work.
>> >>
>> >> Mike
>> >>
>> >>
>> >> On 4/17/16, Raghava Mutharaju  wrote:
>> >> > Yes its the same data.
>> >> >
>> >> > 1) The number of partitions are the same (8, which is an argument to
>> >> > the
>> >> > HashPartitioner). In the first case, these partitions are spread
>> across
>> >> > both the worker nodes. In the second case, all the partitions are on
>> >> > the
>> >> > same node.
>> >> > 2) What resources would be of interest here? Scala shell takes the
>> >> default
>> >> > parameters since we use "bin/spark-shell --master " to
>> run
>> >> the
>> >> > scala-shell. For the scala program, we do set some configuration
>> >> > options
>> >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> >> > serializer.
>> >> >
>> >> > We are running this on Azure D3-v2 machines which have 4 cores and
>> 14GB
>> >> > RAM.1 executor runs on each worker node. Following configuration
>> >> > options
>> >> > are set for the scala program -- perhaps we should move it to the
>> spark
>> >> > config file.
>> >> >
>> >> > Driver memory and executor memory are set to 12GB
>> >> > parallelism is set to 8
>> >> > Kryo serializer is used
>> >> > Number of retainedJobs and retainedStages has been increased to check
>> >> them
>> >> > in the UI.
>> >> >
>> >> > What information regarding Spark Context would be of interest here?
>> >> >
>> >> > Regards,
>> >> > Raghava.
>> >> >
>> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>> >> > wrote:
>> >> >
>> >> >> If the data file is same then it should have similar distribution of
>> >> >> keys.
>> >> >> Few queries-
>> >> >>
>> >> >> 1. Did you compare the number of partitions in both the cases?
>> >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> >> Program being submitted?
>> >> >>
>> >> >> Also, can you please share the details of Spark Context, 

Re: How to add a custom jar file to the Spark driver?

2016-03-09 Thread Sonal Goyal
Hi Gerhard,

I just stumbled upon some documentation on EMR - link below. Seems there is
a -u option to add jars in S3 to your classpath, have you tried that ?

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-spark-configure.html


Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Wed, Mar 9, 2016 at 11:50 AM, Wang, Daoyuan 
wrote:

> Hi Gerhard,
>
>
>
> How does EMR set its conf for spark? I think if you set SPARK_CLASSPATH
> and spark.dirver.extraClassPath, spark would ignore SPARK_CLASSPATH.
>
> I think you can do this by read the configuration from SparkConf, and then
> add your custom settings to the corresponding key, and use the updated
> SparkConf to instantiate your SparkContext.
>
>
>
> Thanks,
>
> Daoyuan
>
>
>
> *From:* Gerhard Fiedler [mailto:gfied...@algebraixdata.com]
> *Sent:* Wednesday, March 09, 2016 5:41 AM
> *To:* user@spark.apache.org
> *Subject:* How to add a custom jar file to the Spark driver?
>
>
>
> We’re running Spark 1.6.0 on EMR, in YARN client mode. We run Python code,
> but we want to add a custom jar file to the driver.
>
>
>
> When running on a local one-node standalone cluster, we just use
> spark.driver.extraClassPath and everything works:
>
>
>
> spark-submit --conf spark.driver.extraClassPath=/path/to/our/custom/jar/*
>  our-python-script.py
>
>
>
> But on EMR, this value is set to something that is needed to make their
> installation of Spark work. Setting it to point to our custom jar
> overwrites the original setting rather than adding to it and breaks Spark.
>
>
>
> Our current workaround is to capture to whatever EMR sets
> spark.driver.extraClassPath once, then use that path and add our jar file
> to it. Of course this breaks when EMR changes this path in their cluster
> settings. We wouldn’t necessarily notice this easily. This is how it looks:
>
>
>
> spark-submit --conf
> spark.driver.extraClassPath=/path/to/our/custom/jar/*:/etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
>  our-python-script.py
>
>
>
> We prefer not to do this…
>
>
>
> We tried the spark-submit argument --jars, but it didn’t seem to do
> anything. Like this:
>
>
>
> spark-submit --jars /path/to/our/custom/jar/file.jar  our-python-script.py
>
>
>
> We also tried to set CLASSPATH, but it doesn’t seem to have any impact:
>
>
>
> export CLASSPATH=/path/to/our/custom/jar/*
>
> spark-submit  our-python-script.py
>
>
>
> When using SPARK_CLASSPATH, we got warnings that it is deprecated, and the
> messages also seemed to imply that it affects the same configuration that
> is set by spark.driver.extraClassPath.
>
>
>
>
>
> So, my question is: Is there a clean way to add a custom jar file to a
> Spark configuration?
>
>
>
> Thanks,
>
> Gerhard
>
>
>


Re: Understanding the Web_UI 4040

2016-03-07 Thread Sonal Goyal
Maybe check the worker logs to see what's going wrong with it?
On Mar 7, 2016 9:10 AM, "Angel Angel"  wrote:

> Hello Sir/Madam,
>
>
> I am running the spark-sql application on the cluster.
> In my cluster there are 3 slaves and one Master.
>
> When i saw the progress of my application in web UI hadoopm0:8080
>
> I found that one of my slaves node is always in  *LOADDING *mode.
>
> Can you tell me what is that mean?
>
> Also i am unable to see the DAG graph (As click on the DAG graph it hangs
> the scree for some time).
>
> [image: Inline image 1]
>


Re: Spark Mllib kmeans execution

2016-03-02 Thread Sonal Goyal
It will run distributed
On Mar 2, 2016 3:00 PM, "Priya Ch"  wrote:

> Hi All,
>
>   I am running k-means clustering algorithm. Now, when I am running the
> algorithm as -
>
> val conf = new SparkConf
> val sc = new SparkContext(conf)
> .
> .
> val kmeans = new KMeans()
> val model = kmeans.run(RDD[Vector])
> .
> .
> .
> The 'kmeans' object gets created on driver. Now does *kmeans.run() *get
> executed on each partition of the rdd in distributed fashion or else does
> the entire RDD is brought to driver and then gets executed at the driver on
> the entire RDD ??
>
> Thanks,
> Padma Ch
>
>
>


Re: Mllib Logistic Regression performance relative to Mahout

2016-03-01 Thread Sonal Goyal
You can also check if you are caching your input so that features are not
being read/computed every iteration.

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Mon, Feb 29, 2016 at 1:02 PM, Yashwanth Kumar 
wrote:

> Hi,
> If your features are numeric, try feature scaling and feed it to Spark
> Logistic Regression, It might increase rate%
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-Logistic-Regression-performance-relative-to-Mahout-tp26346p26358.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Getting prediction values in spark mllib

2016-02-11 Thread Sonal Goyal
Looks like you are doing binary classification and you are getting the
label out. If you clear the model threshold, you should be able to get the
raw score.
On Feb 11, 2016 1:32 PM, "Chandan Verma" 
wrote:

>
>
> Following is the code Snippet
>
>
>
>
>
> JavaRDD> predictionAndLabels = data
>
> .map(new
> Function>() {
>
>
> public Tuple2 call(LabeledPoint p) {
>
>
> Double prediction = sameModel.predict(p.features());
>
>
> return new Tuple2(prediction, p.label());
>
>
> }
>
> });
>
>
>
> The line "sameModel.predict(p.features());" gives me the prediction as
> double value (eg 0.0 or 1.0) .
>
> How can i get the prediction value with more digits after decimal point.eg
> 0.2344 etc
> ===
> DISCLAIMER: The information contained in this message (including any
> attachments) is confidential and may be privileged. If you have received it
> by mistake please notify the sender by return e-mail and permanently delete
> this message and any attachments from your system. Any dissemination, use,
> review, distribution, printing or copying of this message in whole or in
> part is strictly prohibited. Please note that e-mails are susceptible to
> change. CitiusTech shall not be liable for the improper or incomplete
> transmission of the information contained in this communication nor for any
> delay in its receipt or damage to your system. CitiusTech does not
> guarantee that the integrity of this communication has been maintained or
> that this communication is free of viruses, interceptions or interferences.
> 
>
>


Re: How to efficiently Scan (not filter nor lookup) part of Paird RDD or Ordered RDD

2016-01-24 Thread Sonal Goyal
One thing you can also look at is to save your data in a way that can be
accessed through file patterns. Eg by hour, zone etc so that you only load
what you need.
On Jan 24, 2016 10:00 PM, "Ilya Ganelin"  wrote:

> The solution I normally use is to zipWithIndex() and then use the filter
> operation. Filter is an O(m) operation where m is the size of your
> partition, not an O(N) operation.
>
> -Ilya Ganelin
>
> On Sat, Jan 23, 2016 at 5:48 AM, Nirav Patel 
> wrote:
>
>> Problem is I have RDD of about 10M rows and it keeps growing. Everytime
>> when we want to perform query and compute on subset of data we have to use
>> filter and then some aggregation. Here I know filter goes through each
>> partitions and every rows of RDD which may not be efficient at all.
>>
>> Spark having Ordered RDD functions I dont see why it's so difficult to
>> implement such function. Cassandra/Hbase has it for years where they can
>> fetch data only from certain partitions based on your rowkey. Scala TreeMap
>> has Range function to do the same.
>>
>> I think people have been looking for this for while. I see several post
>> asking this.
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-td20170.html#a26048
>>
>> By the way, I assume there
>> Thanks
>> Nirav
>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>
>


Re: merge 3 different types of RDDs in one

2015-12-01 Thread Sonal Goyal
I think you should be able to join different  rdds with same key. Have you
tried that?
On Dec 1, 2015 3:30 PM, "Praveen Chundi"  wrote:

> cogroup could be useful to you, since all three are PairRDD's.
>
>
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
>
> Best Regards,
> Praveen
>
>
> On 01.12.2015 10:47, Shams ul Haque wrote:
>
>> Hi All,
>>
>> I have made 3 RDDs of 3 different dataset, all RDDs are grouped by
>> CustomerID in which 2 RDDs have value of Iterable type and one has signle
>> bean. All RDDs have id of Long type as CustomerId. Below are the model for
>> 3 RDDs:
>> JavaPairRDD
>> JavaPairRDD
>> JavaPairRDD
>>
>> Now, i have to merge all these 3 RDDs as signle one so that i can
>> generate excel report as per each customer by using data in 3 RDDs.
>> As i tried to using Join Transformation but it needs RDDs of same type
>> and it works for two RDDs.
>> So my questions is,
>> 1. is there any way to done my merging task efficiently, so that i can
>> get all 3 dataset by CustomerId?
>> 2. If i merge 1st two using Join Transformation, then do i need to run
>> groupByKey() before Join so that all data related to single customer will
>> be on one node?
>>
>>
>> Thanks
>> Shams
>>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Datastore for GrpahX

2015-11-22 Thread Sonal Goyal
For graphx, you should be able to read and write data from practically any
datastore Spark supports - flat files, rdbms, hadoop etc. If you want to
save your graph as it is, check something like Neo4j.

http://neo4j.com/developer/apache-spark/

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World

Reifier at Spark Summit 2015






On Sun, Nov 22, 2015 at 2:38 AM, Ilango Ravi  wrote:

> Hi
>
> I am trying to figure which  Datastore I can use for storing data to be
> used with GraphX.  Is there a good Graph database out there which I can use
> for storing Graph data for efficient data storage/retireval.
>
> thanks,
> ravi
>


Re: spark-submit stuck and no output in console

2015-11-17 Thread Sonal Goyal
I would suggest a couple of things to try

A. Try running the example program with master as local[*]. See if spark
can run locally or not.
B. Check spark master and worker logs.
C. Check if normal hadoop jobs can be run properly on the cluster.
D. Check spark master webui and see health of cluster.
On Nov 17, 2015 4:16 PM, "Kayode Odeyemi"  wrote:

> Sonal, SparkPi couldn't run as well. Stuck to the screen with no output
>
> hadoop-user@yks-hadoop-m01:/usr/local/spark$ ./bin/run-example SparkPi
>
> On Tue, Nov 17, 2015 at 12:22 PM, Steve Loughran 
> wrote:
>
>> 48 hours is one of those kerberos warning times (as is 24h, 72h and 7
>> days) 
>
>
> Does this mean I need to restart the whole Hadoop YARN cluster to reset
> kerberos?
>
>
>
>
> --
> Odeyemi 'Kayode O.
> http://ng.linkedin.com/in/kayodeodeyemi. t: @charyorde
>


Re: how can evenly distribute my records in all partition

2015-11-17 Thread Sonal Goyal
Think about how you want to distribute your data and how your keys are
spread currently. Do you want to compute something per day, per week etc.
Based on that, return a partition number. You could use mod 30 or some such
function to get the partitions.
On Nov 18, 2015 5:17 AM, "prateek arora"  wrote:

> Hi
> I am trying to implement custom partitioner using this link
> http://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where
> ( in link example key value is from 0 to (noOfElement - 1))
>
> but not able to understand how i  implement  custom partitioner  in my
> case:
>
> my parent RDD have 4 partition and RDD key is : TimeStamp and Value is
> JPEG Byte Array
>
>
> Regards
> Prateek
>
>
> On Tue, Nov 17, 2015 at 9:28 AM, Ted Yu  wrote:
>
>> Please take a look at the following for example:
>>
>> ./core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
>> ./core/src/main/scala/org/apache/spark/Partitioner.scala
>>
>> Cheers
>>
>> On Tue, Nov 17, 2015 at 9:24 AM, prateek arora <
>> prateek.arora...@gmail.com> wrote:
>>
>>> Hi
>>> Thanks
>>> I am new in spark development so can you provide some help to write a
>>> custom partitioner to achieve this.
>>> if you have and link or example to write custom partitioner please
>>> provide to me.
>>>
>>> On Mon, Nov 16, 2015 at 6:13 PM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 You can write your own custom partitioner to achieve this

 Regards
 Sab
 On 17-Nov-2015 1:11 am, "prateek arora" 
 wrote:

> Hi
>
> I have a RDD with 30 record ( Key/value pair ) and running 30 executor
> . i
> want to reparation this RDD in to 30 partition so every partition  get
> one
> record and assigned to one executor .
>
> when i used rdd.repartition(30) its repartition my rdd in 30 partition
> but
> some partition get 2 record , some get 1 record and some not getting
> any
> record .
>
> is there any way in spark so i can evenly distribute my record in all
> partition .
>
> Regards
> Prateek
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-can-evenly-distribute-my-records-in-all-partition-tp25394.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>>>
>>
>


Re: spark-submit stuck and no output in console

2015-11-17 Thread Sonal Goyal
How did the example spark jobs go? SparkPI etc..?

Best Regards,
Sonal
Founder, Nube Technologies <http://www.nubetech.co>
Reifier at Strata Hadoop World
<http://strataconf.com/big-data-conference-sg-2015/public/schedule/detail/44606>
Reifier at Spark Summit 2015
<https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>

<http://in.linkedin.com/in/sonalgoyal>



On Tue, Nov 17, 2015 at 3:24 PM, Kayode Odeyemi <drey...@gmail.com> wrote:

> Thanks for the reply Sonal.
>
> I'm on JDK 7 (/usr/lib/jvm/java-7-oracle)
>
> My env is a YARN cluster made of 7 nodes (6 datanodes/
> node manager, 1 namenode/resource manager).
>
> On the namenode, is where I executed the spark-submit job while on one of
> the datanodes,  I executed 'hadoop fs -put /binstore /user/hadoop-user/'
> to dump 1TB of data into all the datanodes. That process is still running
> without hassle and it's only using 1.3 GB of 1.7g heap space.
>
> Initially, I submitted 2 jobs to the YARN cluster which was running for 2
> days and suddenly stops. Nothing in the logs shows the root cause.
>
>
> On Tue, Nov 17, 2015 at 11:42 AM, Sonal Goyal <sonalgoy...@gmail.com>
> wrote:
>
>> Could it be jdk related ? Which version are you on?
>>
>> Best Regards,
>> Sonal
>> Founder, Nube Technologies <http://www.nubetech.co>
>> Reifier at Strata Hadoop World
>> <http://strataconf.com/big-data-conference-sg-2015/public/schedule/detail/44606>
>> Reifier at Spark Summit 2015
>> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>> On Tue, Nov 17, 2015 at 2:48 PM, Kayode Odeyemi <drey...@gmail.com>
>> wrote:
>>
>>> Anyone experienced this issue as well?
>>>
>>> On Mon, Nov 16, 2015 at 8:06 PM, Kayode Odeyemi <drey...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Or are you saying that the Java process never even starts?
>>>>
>>>>
>>>> Exactly.
>>>>
>>>> Here's what I got back from jstack as expected:
>>>>
>>>> hadoop-user@yks-hadoop-m01:/usr/local/spark/bin$ jstack 31316
>>>> 31316: Unable to open socket file: target process not responding or
>>>> HotSpot VM not loaded
>>>> The -F option can be used when the target process is not responding
>>>> hadoop-user@yks-hadoop-m01:/usr/local/spark/bin$ jstack 31316 -F
>>>> Attaching to core -F from executable 31316, please wait...
>>>> Error attaching to core file: Can't attach to the core file
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Odeyemi 'Kayode O.
>>> http://ng.linkedin.com/in/kayodeodeyemi. t: @charyorde
>>>
>>
>>
>
>
> --
> Odeyemi 'Kayode O.
> http://ng.linkedin.com/in/kayodeodeyemi. t: @charyorde
>


Re: spark-submit stuck and no output in console

2015-11-17 Thread Sonal Goyal
Could it be jdk related ? Which version are you on?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World

Reifier at Spark Summit 2015






On Tue, Nov 17, 2015 at 2:48 PM, Kayode Odeyemi  wrote:

> Anyone experienced this issue as well?
>
> On Mon, Nov 16, 2015 at 8:06 PM, Kayode Odeyemi  wrote:
>
>>
>> Or are you saying that the Java process never even starts?
>>
>>
>> Exactly.
>>
>> Here's what I got back from jstack as expected:
>>
>> hadoop-user@yks-hadoop-m01:/usr/local/spark/bin$ jstack 31316
>> 31316: Unable to open socket file: target process not responding or
>> HotSpot VM not loaded
>> The -F option can be used when the target process is not responding
>> hadoop-user@yks-hadoop-m01:/usr/local/spark/bin$ jstack 31316 -F
>> Attaching to core -F from executable 31316, please wait...
>> Error attaching to core file: Can't attach to the core file
>>
>>
>>
>
>
> --
> Odeyemi 'Kayode O.
> http://ng.linkedin.com/in/kayodeodeyemi. t: @charyorde
>


Re: does spark ML have some thing like createDataPartition() in R caret package ?

2015-11-13 Thread Sonal Goyal
The RDD has a takeSample method where you can supply the flag for
replacement or not as well as the fraction to sample.
On Nov 14, 2015 2:51 AM, "Andy Davidson" 
wrote:

> In R, its easy to split a data set into training, crossValidation, and
> test set. Is there something like this in spark.ml? I am using python of
> now.
>
> My real problem is I want to randomly select a relatively small data set
> to do some initial data exploration. Its not clear to me how using spark I
> could create a random sample from a large data set. I would prefer to
> sample with out replacement.
>
> I have not tried to use sparkR yet. I assume I would not be able to use
> the caret package with spark ML
>
> Kind regards
>
> Andy
>
> ```{R}
>inTrain <- createDataPartition(y=csv$classe, p=0.7, list=FALSE)
> trainSetDF <- csv[inTrain,]
> testSetDF <- csv[-inTrain,]
> ```
>
>


Re: Spark: How to find similar text title

2015-10-20 Thread Sonal Goyal
Do you want to compare within the rdd or do you have some external list or
data coming in ?

For matching, you could look at string edit distances or cosine similarity
if you are only comparing title strings.
On Oct 20, 2015 9:09 PM, "Ascot Moss"  wrote:

> Hi,
>
> I have my RDD that stores the titles of some articles:
> 1. "About Spark Streaming"
> 2. "About Spark MLlib"
> 3. "About Spark SQL"
> 4. "About Spark Installation"
> 5. "Kafka Streaming"
> 6. "Kafka Setup"
> 7. 
>
> I need to build a model to find titles by similarity,
> e.g
> if given "About Spark", hope to get:
>
> "About Spark Installation", 0.98622 (where 0.98622 is the score
> of similarity, range between 0 to 1)
> "About Spark MLlib", 0.95394
> "About Spark Streaming", 0.94332
> "About Spark SQL", 0.9111
>
> Any idea or reference to do so?
>
> Thanks
> Ascot
>
>
>
>
>
>  and need to find out similar titles
>


Re: In-memory computing and cache() in Spark

2015-10-18 Thread Sonal Goyal
Hi Jia,

RDDs are cached on the executor, not on the driver. I am assuming you are
running locally and haven't changed spark.executor.memory?

Sonal
On Oct 19, 2015 1:58 AM, "Jia Zhan"  wrote:

Anyone has any clue what's going on.? Why would caching with 2g memory much
faster than with 15g memory?

Thanks very much!

On Fri, Oct 16, 2015 at 2:02 PM, Jia Zhan  wrote:

> Hi all,
>
> I am running Spark locally in one node and trying to sweep the memory size
> for performance tuning. The machine has 8 CPUs and 16G main memory, the
> dataset in my local disk is about 10GB. I have several quick questions and
> appreciate any comments.
>
> 1. Spark performs in-memory computing, but without using RDD.cache(), will
> anything be cached in memory at all? My guess is that, without RDD.cache(),
> only a small amount of data will be stored in OS buffer cache, and every
> iteration of computation will still need to fetch most data from disk every
> time, is that right?
>
> 2. To evaluate how caching helps with iterative computation, I wrote a
> simple program as shown below, which basically consists of one saveAsText()
> and three reduce() actions/stages. I specify "spark.driver.memory" to
> "15g", others by default. Then I run three experiments.
>
> *   val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>
>*val* *sc* = *new* *SparkContext*(conf)
>
>*val* *input* = sc.textFile(*"/InputFiles"*)
>
>   *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>
>   *val* *ITERATIONS* = *3*
>
>   *for* (i *<-* *1* to *ITERATIONS*) {
>
>   *val* *totallength* = input.filter(line*=>*line.contains(*"the"*
> )).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>
>   }
>
> (I) The first run: no caching at all. The application finishes in ~12
> minutes (2.6min+3.3min+3.2min+3.3min)
>
> (II) The second run, I modified the code so that the input will be cached:
>  *val input = sc.textFile("/InputFiles").cache()*
>  The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>  The storage page in Web UI shows 48% of the dataset  is cached, which
> makes sense due to large java object overhead, and
> spark.storage.memoryFraction is 0.6 by default.
>
> (III) However, the third run, same program as the second one, but I
> changed "spark.driver.memory" to be "2g".
>The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
> And UI shows 6% of the data is cached.
>
> *From the results we can see the reduce stages finish in seconds, how
> could that happen with only 6% cached? Can anyone explain?*
>
> I am new to Spark and would appreciate any help on this. Thanks!
>
> Jia
>
>
>
>


-- 
Jia Zhan


Re: java.io.InvalidClassException using spark1.4.1 for Terasort

2015-10-14 Thread Sonal Goyal
This is probably a versioning issue, are you sure your code is compiling
and running against the same versions?
On Oct 14, 2015 2:19 PM, "Shreeharsha G Neelakantachar" <
shreeharsh...@in.ibm.com> wrote:

> Hi,
>   I have Terasort being executed on spark1.4.1 with hadoop 2.7 for a
> datasize of 1TB. When i change my os user from spark1 to hduser, i am
> observing below exception.  Please let me know what is wrong here. I tried
> to update scala-2.10 to 2.11 and compiled Terasort scala code using sbt.
> But nothing is helping here. I had created the spark1 (spark1 to spark4)
> user for a multi-tenancy run.
>
> 2015-10-13 03:42:54,840 [sparkDriver-akka.actor.default-dispatcher-2] INFO
>  org.apache.spark.scheduler.TaskSetManager - Starting task 199.0 in stage
> 0.0 (TID 199, 9.37.251.65, ANY, 1544 bytes)
> 2015-10-13 03:42:54,843 [task-result-getter-2] WARN
>  org.apache.spark.scheduler.TaskSetManager - Lost task 173.0 in stage 0.0
> (TID 173, 9.37.251.65): java.io.InvalidClassException:
> scala.reflect.ClassTag$$anon$1; local class incompatible: stream classdesc
> serialVersionUID = -4937928798201944954, local class serialVersionUID =
> -8102093212602380348
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Any help in this regard would be of great help.
>
> Regards
> Shreeharsha GN
>
>


Re: Web UI is not showing up

2015-09-01 Thread Sonal Goyal
The web ui is at port 8080. 4040 will show up something when you have a
running job or if you have configured history server.
On Sep 1, 2015 8:57 PM, "Sunil Rathee"  wrote:

>
> Hi,
>
>
> localhost:4040 is not showing anything on the browser. Do we have to start
> some service?
>
> --
>
>
> Sunil Rathee
>
>
>
>


Re: Web UI is not showing up

2015-09-01 Thread Sonal Goyal
Is your master up? Check the java processes to see if they are running.

Best Regards,
Sonal
Founder, Nube Technologies <http://www.nubetech.co>
Reifier covered in YourStory <http://yourstory.com/2015/08/deep-learning/>
Reifier at Spark Summit 2015
<https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>

<http://in.linkedin.com/in/sonalgoyal>



On Tue, Sep 1, 2015 at 9:06 PM, Sunil Rathee <ratheesunil...@gmail.com>
wrote:

> localhost:8080 is also not showing anything. Does some application running
> at the same time?
>
> On Tue, Sep 1, 2015 at 9:04 PM, Sonal Goyal <sonalgoy...@gmail.com> wrote:
>
>> The web ui is at port 8080. 4040 will show up something when you have a
>> running job or if you have configured history server.
>> On Sep 1, 2015 8:57 PM, "Sunil Rathee" <ratheesunil...@gmail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>>
>>> localhost:4040 is not showing anything on the browser. Do we have to
>>> start some service?
>>>
>>> --
>>>
>>>
>>> Sunil Rathee
>>>
>>>
>>>
>>>
>
>
> --
>
>
> Sunil Rathee
>
>
>
>


Re: Any quick method to sample rdd based on one filed?

2015-08-28 Thread Sonal Goyal
Filter into true rdd and false rdd. Union true rdd and sample of false rdd.
On Aug 28, 2015 2:57 AM, Gavin Yue yue.yuany...@gmail.com wrote:

 Hey,


 I have a RDD[(String,Boolean)]. I want to keep all Boolean: True rows and
 randomly keep some Boolean:false rows.  And hope in the final result, the
 negative ones could be 10 times more than positive ones.


 What would be most efficient way to do this?

 Thanks,






Re: Question on take function - Spark Java API

2015-08-26 Thread Sonal Goyal
You can try using wholeTextFile which will give you a pair rdd of fileName,
content. flatMap through this and manipulate the content.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
Check out Reifier at Spark Summit 2015
https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

http://in.linkedin.com/in/sonalgoyal



On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane pankaj.wah...@qiotec.com
wrote:

 Hi community members,


 Apache Spark is Fantastic and very easy to learn.. Awesome work!!!

 *Question:*

 I have multiple files in a folder and and the first line in each file is
 name of the asset that the file belongs to. Second line is csv header row
 and data starts from third row..

 Ex: File 1

 TestAsset01
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,123,456,789
 11-01-2015 15:00:01,123,456,789
 . . .

 Ex: File 2

 TestAsset02
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,1230,4560,7890
 11-01-2015 15:00:01,1230,4560,7890
 . . .

 I have got nearly 1000 files in each folder sizing ~10G

 I am using apache spark Java api to read all this files.

 Following is code extract that I am using:

 try (JavaSparkContext sc = new JavaSparkContext(conf)) {
 MapString, String readingTypeMap = getReadingTypesMap(sc);
 //Read File
 JavaRDDString data = 
 sc.textFile(resourceBundle.getString(FOLDER_NAME));
 //Get Asset
 String asset = data.take(1).get(0);
 //Extract Time Series Data
 JavaRDDString actualData = data.filter(line - 
 line.contains(DELIMERTER));
 //Strip header
 String header = actualData.take(1).get(0);
 String[] headers = header.split(DELIMERTER);
 //Extract actual data
 JavaRDDString timeSeriesLines = actualData.filter(line - 
 !line.equals(header));
 //Extract valid records
 JavaRDDString validated = timeSeriesLines.filter(line - 
 validate(line));
 //Find Granularity
 Integer granularity = 
 toInt(resourceBundle.getString(GRANULARITY));
 //Transform to TSD objects
 JavaRDDTimeSeriesData tsdFlatMap = 
 transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);

 //Save to Cassandra
 
 javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace),
 time_series_data, 
 mapToRow(TimeSeriesData.class)).saveToCassandra();

 System.out.println(Total Records:  + timeSeriesLines.count());
 System.out.println(Valid Records:  + validated.count());
 }

 Within TimeSeriesData Object I need to set the asset name for the reading,
 so I need output of data.take(1) to be different for different files.

 Thank You.

 Best Regards,
 Pankaj




 QIO Technologies Limited is a limited company registered in England 
 Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number
 09368431

 This message and the information contained within it is intended solely
 for the addressee and may contain confidential or privileged information.
 If you have received this message in error please notify QIO Technologies
 Limited immediately and then permanently delete this message. If you are
 not the intended addressee then you must not copy, transmit, disclose or
 rely on the information contained in this message or in any attachment to
 it, all such use is prohibited to maximum extent possible by law.



Re: Spark

2015-08-25 Thread Sonal Goyal
Sorry am I missing something? There is a method sortBy on both RDD and
PairRDD.


def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int =
this.partitions.length
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html#partitions:Array[org.apache.spark.Partition]
)(implicitord: Ordering[K], ctag: ClassTag[K]): RDD
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html
[T]
Return this RDD sorted by the given key function.




Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
Check out Reifier at Spark Summit 2015
https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

http://in.linkedin.com/in/sonalgoyal



On Tue, Aug 25, 2015 at 12:08 PM, Spark Enthusiast sparkenthusi...@yahoo.in
 wrote:

 But, there is no sort() primitive for an RDD. How do I sort?



 On Tuesday, 25 August 2015 11:10 AM, Sonal Goyal sonalgoy...@gmail.com
 wrote:


 I think you could try sorting the endPointsCount and then doing a take.
 This should be a distributed process and only the result would get returned
 to the driver.

 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/
 Check out Reifier at Spark Summit 2015
 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

 http://in.linkedin.com/in/sonalgoyal



 On Tue, Aug 25, 2015 at 10:22 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 I was running a Spark Job to crunch a 9GB apache log file When I saw the
 following error:


 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage
 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal):
 ExecutorLostFailure (executor 29 lost)
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 40), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 86), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 84), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 22), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 48), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 12), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59)
 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove
 executor 29 from BlockManagerMaster.
 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(29,
 ip-10-150-137-100.ap-southeast-1.compute.internal, 39411)

   .
   .
 Encountered Exception An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.collectAndServe.
 : org.apache.spark.SparkException: Job cancelled because SparkContext was
 shut down
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
 at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
 at org.apache.spark.SparkContext.stop(SparkContext.scala:1380)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143)

 .
 .

 Looking further, it seems like takeOrdered (called by my application) uses
 collect() internally and hence drains out all the Drive memory.

 line 361, in top10EndPoints
 topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
   File /home/hadoop/spark/python/pyspark/rdd.py, line 1174, in
 takeOrdered
 return self.mapPartitions(lambda it: [heapq.nsmallest(num, it,
 key)]).reduce(merge)
   File /home/hadoop/spark/python/pyspark/rdd.py, line 739, in reduce
 vals = self.mapPartitions(func).collect()
   File /home/hadoop/spark/python/pyspark/rdd.py, line 713, in collect
 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
 self.target_id, self.name)
   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 format(target_id, '.', name), value)

 How can I rewrite this code



 endpointCounts = (access_logs

   .map(lambda log: (log.endpoint, 1))

   .reduceByKey(lambda a, b

Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?

2015-08-25 Thread Sonal Goyal
From what I have understood, you probably need to convert your vector to
breeze and do your operations there. Check
stackoverflow.com/questions/28232829/addition-of-two-rddmllib-linalg-vectors
On Aug 25, 2015 7:06 PM, Kristina Rogale Plazonic kpl...@gmail.com
wrote:

 Hi all,

 I'm still not clear what is the best (or, ANY) way to add/subtract
 two org.apache.spark.mllib.Vector objects in Scala.

 Ok, I understand there was a conscious Spark decision not to support
 linear algebra operations in Scala and leave it to the user to choose a
 linear algebra library.

 But, for any newcomer from R or Python, where you don't think twice about
 adding two vectors, it is such a productivity shot in the foot to have to
 write your own + operation. I mean, there is support in Spark for p-norm of
 Vectors, for sqdist between two Vectors, but not for +/-? As I said, I'm a
 newcomer to linear algebra in Scala and am not familiar with Breeze or
 apache.commons - I am willing to learn, but would really benefit from
 guidance from more experienced users. I am also not used to optimizing
 low-level code and am sure that any hack I do will be just horrible.

 So, please, could somebody point me to a blog post, documentation, or just
 patches for this really basic functionality. What do you do to get around
 it? Am I the only one to have a problem? (And, would it really be so
 onerous to add +/- to Spark? After all, even org.apache.spark.sql.Column
 class does have +,-,*,/  )

 My stupid little use case is to generate some toy data for Kmeans, and I
 need to translate a Gaussian blob to another center (for streaming and
 nonstreaming KMeans both).

 Many thanks! (I am REALLY embarassed to ask such a simple question...)

 Kristina



Re: Spark

2015-08-24 Thread Sonal Goyal
I think you could try sorting the endPointsCount and then doing a take.
This should be a distributed process and only the result would get returned
to the driver.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
Check out Reifier at Spark Summit 2015
https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

http://in.linkedin.com/in/sonalgoyal



On Tue, Aug 25, 2015 at 10:22 AM, Spark Enthusiast sparkenthusi...@yahoo.in
 wrote:

 I was running a Spark Job to crunch a 9GB apache log file When I saw the
 following error:


 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage
 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal):
 ExecutorLostFailure (executor 29 lost)
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 40), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 86), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 84), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 22), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 48), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 12), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59)
 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove
 executor 29 from BlockManagerMaster.
 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(29,
 ip-10-150-137-100.ap-southeast-1.compute.internal, 39411)

   .
   .
 Encountered Exception An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.collectAndServe.
 : org.apache.spark.SparkException: Job cancelled because SparkContext was
 shut down
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
 at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
 at org.apache.spark.SparkContext.stop(SparkContext.scala:1380)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143)

 .
 .

 Looking further, it seems like takeOrdered (called by my application) uses
 collect() internally and hence drains out all the Drive memory.

 line 361, in top10EndPoints
 topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
   File /home/hadoop/spark/python/pyspark/rdd.py, line 1174, in
 takeOrdered
 return self.mapPartitions(lambda it: [heapq.nsmallest(num, it,
 key)]).reduce(merge)
   File /home/hadoop/spark/python/pyspark/rdd.py, line 739, in reduce
 vals = self.mapPartitions(func).collect()
   File /home/hadoop/spark/python/pyspark/rdd.py, line 713, in collect
 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
 self.target_id, self.name)
   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 format(target_id, '.', name), value)

 How can I rewrite this code



 endpointCounts = (access_logs

   .map(lambda log: (log.endpoint, 1))

   .reduceByKey(lambda a, b : a + b))


 #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, 
 count2), ]


 topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])


 so that this error does not happen?




Re: grpah x issue spark 1.3

2015-08-17 Thread Sonal Goyal
I have been using graphx in production on 1.3 and 1.4 with no issues.
What's the  exception you see and what are you trying to do?
On Aug 17, 2015 10:49 AM, dizzy5112 dave.zee...@gmail.com wrote:

 Hi using spark 1.3 and trying some sample code:


 when i run:

 all works well but with

 it falls over and i get a whole heap of errors:

 Is anyone else experiencing this? Ive tried different graphs and always end
 up with the same results.

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/grpah-x-issue-spark-1-3-tp24292.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: All masters are unresponsive! Giving up.

2015-08-07 Thread Sonal Goyal
There seems  to be a version mismatch somewhere. You can try and find out
the cause with debug serialization information. I think the jvm flag
-Dsun.io.serialization.extendedDebugInfo=true should help.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
Check out Reifier at Spark Summit 2015
https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

http://in.linkedin.com/in/sonalgoyal



On Fri, Aug 7, 2015 at 4:42 AM, Jeff Jones jjo...@adaptivebiotech.com
wrote:

 I wrote a very simple Spark 1.4.1 app that I can run through a local
 driver program just fine using setMaster(“local[*]”).  The app is as
 follows:



 import org.apache.spark.SparkContext

 import org.apache.spark.SparkContext._

 import org.apache.spark.SparkConf

 import org.apache.spark.rdd.RDD



 object Hello {

   def main(args: Array[String]): Unit = {

 val conf = new SparkConf().setAppName(Simple
 Application).setMaster(local[*])

 val sc = new SparkContext(conf)

 val data:RDD[Int] = sc.parallelize(Seq(1,2,12,34,2354,123,100), 2)

 println(Max:  + data.max)

 println(Min:  + data.min)

   }

 }



 I compile this using the following build.sbt which will pull the needed
 Spark libraries for me.



 name := SparkyJeff



 version := 1.0



 scalaVersion := 2.11.6



 // Change this to another test framework if you prefer

 libraryDependencies ++= Seq(

 org.apache.spark %% spark-core % 1.4.1,

 org.apache.spark %% spark-sql  % 1.4.1)



 // Uncomment to use Akka

 //libraryDependencies += com.typesafe.akka %% akka-actor % 2.3.11



 fork := true



 Now I’m trying to run this against a standalone cluster by changing the
 setMaster(“local[*]”) to setMaster(“spark://p3.ourdomain.com:7077”). I
 downloaded Spark 1.4.1 for Hadoop 2.6 or greater.  Set the SPARK_MASTER_IP=”
 p3.ourdomain.com”, SPARK_WORKER_CORES=”1000”,SPARK_WORKER_MEMORY=”500g”
 and then started the cluster using run-all.sh. The cluster appears to start
 fine. I can hit cluster UI at p3.ourdomain.com:8080 and see the same
 master URL as mentioned above.



 Now when I run my little app I get the following client error:



 …

 [error] 15/08/05 16:03:40 INFO AppClient$ClientActor: Connecting to master
 akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...

 [error] 15/08/05 16:03:40 WARN ReliableDeliverySupervisor: Association
 with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has
 failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 [error] 15/08/05 16:04:00 INFO AppClient$ClientActor: Connecting to master
 akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...

 [error] 15/08/05 16:04:00 WARN ReliableDeliverySupervisor: Association
 with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has
 failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 [error] 15/08/05 16:04:20 INFO AppClient$ClientActor: Connecting to master
 akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...

 [error] 15/08/05 16:04:20 WARN ReliableDeliverySupervisor: Association
 with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has
 failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 [error] 15/08/05 16:04:40 ERROR SparkDeploySchedulerBackend: Application
 has been killed. Reason: All masters are unresponsive! Giving up.

 …



 Looking into the master logs I find:



 15/08/06 22:52:28 INFO Master: akka.tcp://sparkDriver@192.168.137.41:48877
 got disassociated, removing it.

 15/08/06 22:52:46 ERROR Remoting: org.apache.spark.deploy.Command; local
 class incompatible: stream classdesc serialVersionUID =
 -7098307370860582211, local class serialVersionUID = -3335312719467547622

 java.io.InvalidClassException: org.apache.spark.deploy.Command; local
 class incompatible: stream classdesc serialVersionUID =
 -7098307370860582211, local class serialVersionUID = -3335312719467547622

 at
 java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)

 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)

 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)

 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

 at
 

Re: spark cluster setup

2015-08-03 Thread Sonal Goyal
Your master log files will be on the spark home folder/logs at the master
machine. Do they show an error ?

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
Check out Reifier at Spark Summit 2015
https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

http://in.linkedin.com/in/sonalgoyal



On Mon, Aug 3, 2015 at 9:27 AM, Angel Angel areyouange...@gmail.com wrote:

 Hi,

 i have attached the snapshot of console.
 actually i don't know how to see the Master logs.
 still i have attache  the my master web UI.

 and the is log file errors.




 2015-07-23 17:00:59,977 ERROR
 org.apache.spark.scheduler.ReplayListenerBus: Malformed line: not started


 2015-07-23 17:01:00,096 INFO org.eclipse.jetty.server.Server:
 jetty-8.y.z-SNAPSHOT

 2015-07-23 17:01:00,138 INFO org.eclipse.jetty.server.AbstractConnector:
 Started SelectChannelConnector@0.0.0.0:18088

 2015-07-23 17:01:00,138 INFO org.apache.spark.util.Utils: Successfully
 started service on port 18088.

 2015-07-23 17:01:00,140 INFO
 org.apache.spark.deploy.history.HistoryServer: Started HistoryServer at
 http://hadoopm0:18088

 2015-07-24 11:36:18,148 INFO org.apache.spark.SecurityManager: Changing
 view acls to: spark

 2015-07-24 11:36:18,148 INFO org.apache.spark.SecurityManager: Changing
 modify acls to: spark

 2015-07-24 11:36:18,148 INFO org.apache.spark.SecurityManager:
 SecurityManager: authentication disabled; ui acls disabled; users with view
 permissions: Set(spark); users with modify permissions: Set(spark)

 2015-07-24 11:36:18,367 INFO org.apache.spark.SecurityManager: Changing
 acls enabled to: false

 2015-07-24 11:36:18,367 INFO org.apache.spark.SecurityManager: Changing
 admin acls to:

 2015-07-24 11:36:18,368 INFO org.apache.spark.SecurityManager: Changing
 view acls to: root


 Thanks.


 On Mon, Aug 3, 2015 at 11:52 AM, Sonal Goyal sonalgoy...@gmail.com
 wrote:

 What do the master logs show?

 Best Regards,
 Sonal
 Founder, Nube Technologies
 http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=http%3A%2F%2Fwww.nubetech.co%2Fsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1

 Check out Reifier at Spark Summit 2015
 http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=https%3A%2F%2Fspark-summit.org%2F2015%2Fevents%2Freal-time-fuzzy-matching-with-spark-and-elastic-search%2Fsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1


 http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=http%3A%2F%2Fin.linkedin.com%2Fin%2Fsonalgoyalsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1



 On Mon, Aug 3, 2015 at 7:46 AM, Angel Angel areyouange...@gmail.com
 wrote:

 Hello Sir,

 I have install the spark.



 The local  spark-shell is working fine.



 But whenever I tried the Master configuration I got some errors.



 When I run this command ;

 MASTER=spark://hadoopm0:7077 spark-shell



 I gets the errors likes;



 15/07/27 21:17:26 INFO AppClient$ClientActor: Connecting to master
 spark://hadoopm0:7077...

 15/07/27 21:17:46 ERROR SparkDeploySchedulerBackend: Application has
 been killed. Reason: All masters are unresponsive! Giving up.

 15/07/27 21:17:46 WARN SparkDeploySchedulerBackend: Application ID is
 not initialized yet.

 15/07/27 21:17:46 ERROR TaskSchedulerImpl: Exiting due to error from
 cluster scheduler: All masters are unresponsive! Giving up.



 Also I have attached the my screenshot of Master UI.


 Also i have tested using telnet command:


 it shows that hadoopm0 is connected



 Can you please give me some references, documentations or  how to solve
 this issue.

 Thanks in advance.

 Thanking You,


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: spark cluster setup

2015-08-02 Thread Sonal Goyal
What do the master logs show?

Best Regards,
Sonal
Founder, Nube Technologies
http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=http%3A%2F%2Fwww.nubetech.co%2Fsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1

Check out Reifier at Spark Summit 2015
http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=https%3A%2F%2Fspark-summit.org%2F2015%2Fevents%2Freal-time-fuzzy-matching-with-spark-and-elastic-search%2Fsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1

http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=http%3A%2F%2Fin.linkedin.com%2Fin%2Fsonalgoyalsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1



On Mon, Aug 3, 2015 at 7:46 AM, Angel Angel areyouange...@gmail.com wrote:

 Hello Sir,

 I have install the spark.



 The local  spark-shell is working fine.



 But whenever I tried the Master configuration I got some errors.



 When I run this command ;

 MASTER=spark://hadoopm0:7077 spark-shell



 I gets the errors likes;



 15/07/27 21:17:26 INFO AppClient$ClientActor: Connecting to master
 spark://hadoopm0:7077...

 15/07/27 21:17:46 ERROR SparkDeploySchedulerBackend: Application has been
 killed. Reason: All masters are unresponsive! Giving up.

 15/07/27 21:17:46 WARN SparkDeploySchedulerBackend: Application ID is not
 initialized yet.

 15/07/27 21:17:46 ERROR TaskSchedulerImpl: Exiting due to error from
 cluster scheduler: All masters are unresponsive! Giving up.



 Also I have attached the my screenshot of Master UI.


 Also i have tested using telnet command:


 it shows that hadoopm0 is connected



 Can you please give me some references, documentations or  how to solve
 this issue.

 Thanks in advance.

 Thanking You,


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: many-to-many join

2015-07-22 Thread Sonal Goyal
If I understand this correctly, you could join area_code_user and
area_code_state and then flat map to get
user, areacode, state. Then groupby/reduce by user.

You can also try some join optimizations like partitioning on area code or
broadcasting smaller table depending on size of area_code_state.
On Jul 22, 2015 10:15 AM, John Berryman jo...@eventbrite.com wrote:

 Quick example problem that's stumping me:

 * Users have 1 or more phone numbers and therefore one or more area codes.
 * There are 100M users.
 * States have one or more area codes.
 * I would like to the states for the users (as indicated by phone area
 code).

 I was thinking about something like this:

 If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567])
 and area_code_state looks like (area_code,state) ex: (615, [Tennessee])
 then we could do

 states_and_users_mixed = area_code_user.join(area_code_state) \
 .reduceByKey(lambda a,b: a+b) \
 .values()

 user_state_pairs = states_and_users_mixed.flatMap(
 emit_cartesian_prod_of_userids_and_states )
 user_to_states =   user_state_pairs.reduceByKey(lambda a,b: a+b)

 user_to_states.first(1)

  (1234567,[Tennessee,Tennessee,California])

 This would work, but the user_state_pairs is just a list of user_ids and
 state names mixed together and emit_cartesian_prod_of_userids_and_states
 has to correctly pair them. This is problematic because 1) it's weird and
 sloppy and 2) there will be lots of users per state and having so many
 users in a single row is going to make
 emit_cartesian_prod_of_userids_and_states work extra hard to first locate
 states and then emit all userid-state pairs.

 How should I be doing this?

 Thanks,
 -John



Re: ReduceByKey with a byte array as the key

2015-06-11 Thread Sonal Goyal
I think if you wrap the byte[] into an object and implement equals and
hashcode methods, you may be able to do this. There will be the overhead of
extra object, but conceptually it should work unless I am missing
something.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
Check out Reifier at Spark Summit 2015
https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

http://in.linkedin.com/in/sonalgoyal



On Thu, Jun 11, 2015 at 9:27 PM, Mark Tse mark@d2l.com wrote:

  I would like to work with RDD pairs of Tuple2byte[], obj, but byte[]s
 with the same contents are considered as different values because their
 reference values are different.



 I didn't see any to pass in a custom comparer. I could convert the byte[]
 into a String with an explicit charset, but I'm wondering if there's a more
 efficient way.



 Also posted on SO: http://stackoverflow.com/q/30785615/2687324



 Thanks,

 Mark



Re: How to process data in chronological order

2015-05-21 Thread Sonal Goyal
Would partitioning your data based on the key and then running
mapPartitions help?

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Thu, May 21, 2015 at 4:33 AM, roy rp...@njit.edu wrote:

 I have a key-value RDD, key is a timestamp (femto-second resolution, so
 grouping buys me nothing) and I want to reduce it in the chronological
 order.

 How do I do that in spark?

 I am fine with reducing contiguous sections of the set separately and then
 aggregating the resulting objects locally.

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-data-in-chronological-order-tp22966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: dependencies on java-netlib and jblas

2015-05-08 Thread Sonal Goyal
Hi John,

I have been using MLLIB without installing jblas native dependence.
Functionally I have not got stuck. I still need to explore if there are any
performance hits.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, May 8, 2015 at 9:34 PM, John Niekrasz john.niekr...@gmail.com
wrote:

 Newbie question...

 Can I use any of the main ML capabilities of MLlib in a Java-only
 environment, without any native library dependencies?

 According to the documentation, java-netlib provides a JVM fallback. This
 suggests that native netlib libraries are not required.

 It appears that such a fallback is not available for jblas. However, a
 quick
 look at the MLlib source suggests that MLlib's dependencies on jblas are
 rather isolated:

  grep -R jblas
 main/scala/org/apache/spark/ml/recommendation/ALS.scala:import
 org.jblas.DoubleMatrix
 main/scala/org/apache/spark/mllib/optimization/NNLS.scala:import
 org.jblas.{DoubleMatrix, SimpleBlas}

 main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala:import
 org.jblas.DoubleMatrix
 main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:import
 org.jblas.DoubleMatrix
 main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:
 org.jblas.util.Random.seed(42)
 main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala:import
 org.jblas.DoubleMatrix
 main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala:import
 org.jblas.DoubleMatrix

 Is it true or false that many of MLlib's capabilities will work perfectly
 fine without any native (non-Java) libraries installed at all?

 Thanks for the help,
 John



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dependencies-on-java-netlib-and-jblas-tp22818.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Use with Data justifying Spark

2015-04-01 Thread Sonal Goyal
Maybe check the examples?

http://spark.apache.org/examples.html

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Wed, Apr 1, 2015 at 8:31 PM, Vila, Didier didier.v...@teradata.com
wrote:

  Good Morning All,



 I would like to use Spark in a special “synthetics” case that justifies
 the use of spark.



 So , I am looking for  a case based on data ( can be big)  and eventually
 the associated java and/or python code.



 It will be fantastic if you can refer me a link where I can load this case
 !



 Didier



Re: [Graphx Spark] Error of Lost executor and TimeoutException

2015-02-02 Thread Sonal Goyal
That may be the cause of your issue. Take a look at the tuning guide[1] and
maybe also profile your application. See if you can reuse your objects.

1. http://spark.apache.org/docs/latest/tuning.html


Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Sat, Jan 31, 2015 at 4:21 AM, Yifan LI iamyifa...@gmail.com wrote:

 Yes, I think so, esp. for a pregel application… have any suggestion?

 Best,
 Yifan LI





 On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com wrote:

 Is your code hitting frequent garbage collection?

 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/

 http://in.linkedin.com/in/sonalgoyal



 On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com wrote:




 Hi,

 I am running my graphx application on Spark 1.2.0(11 nodes cluster), has
 requested 30GB memory per node and 100 cores for around 1GB input dataset(5
 million vertices graph).

 But the error below always happen…

 Is there anyone could give me some points?

 (BTW, the overall edge/vertex RDDs will reach more than 100GB during
 graph computation, and another version of my application can work well on
 the same dataset while it need much less memory during computation)

 Thanks in advance!!!


 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at
 org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227)
 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
 at
 org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
 at
 org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
 at org.apache.spark.ContextCleaner.org
 http://org.apache.spark.contextcleaner.org/
 $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133)
 at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
 [Stage 91:===  (2 +
 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 0
 [Stage 93:  (29 +
 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on
 small11-tap1.common.lip6.fr: remote Akka client disassociated
 [Stage 83:   (1 + 0) / 6][Stage 86:   (0 + 1) / 2][Stage 88:   (0 + 2)
 / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 9
 [Stage 83:===  (5 + 1) / 6][Stage 88:=   (9 +
 2) / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on
 small10-tap1.common.lip6.fr: remote Akka client disassociated
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 8
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 8

 Best,
 Yifan LI











Re: [Graphx Spark] Error of Lost executor and TimeoutException

2015-01-30 Thread Sonal Goyal
Is your code hitting frequent garbage collection?

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com wrote:




 Hi,

 I am running my graphx application on Spark 1.2.0(11 nodes cluster), has
 requested 30GB memory per node and 100 cores for around 1GB input dataset(5
 million vertices graph).

 But the error below always happen…

 Is there anyone could give me some points?

 (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph
 computation, and another version of my application can work well on the
 same dataset while it need much less memory during computation)

 Thanks in advance!!!


 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at
 org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227)
 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
 at
 org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
 at
 org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
 at org.apache.spark.ContextCleaner.org
 http://org.apache.spark.contextcleaner.org/
 $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133)
 at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
 [Stage 91:===  (2 +
 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 0
 [Stage 93:  (29 + 20)
 / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on
 small11-tap1.common.lip6.fr: remote Akka client disassociated
 [Stage 83:   (1 + 0) / 6][Stage 86:   (0 + 1) / 2][Stage 88:   (0 + 2)
 / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 9
 [Stage 83:===  (5 + 1) / 6][Stage 88:=   (9 + 2)
 / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on
 small10-tap1.common.lip6.fr: remote Akka client disassociated
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 8
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 8

 Best,
 Yifan LI









Re: MLLib in Production

2014-12-10 Thread Sonal Goyal
You can also serialize the model and use it in other places.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Wed, Dec 10, 2014 at 5:32 PM, Yanbo Liang yanboha...@gmail.com wrote:

 Hi Klaus,

 There is no ideal method but some workaround.
 Train model in Spark cluster or YARN cluster, then use RDD.saveAsTextFile
 to store this model which include weights and intercept to HDFS.
 Load weights file and intercept file from HDFS, construct a GLM model, and
 then run model.predict() method, you can get what you want.

 The Spark community also have some ongoing work about export model with
 PMML.

 2014-12-10 18:32 GMT+08:00 Simon Chan simonc...@gmail.com:

 Hi Klaus,

 PredictionIO is an open source product based on Spark MLlib for exactly
 this purpose.
 This is the tutorial for classification in particular:
 http://docs.prediction.io/classification/quickstart/

 You can add custom serving logics and retrieve prediction result through
 REST API/SDKs at other places.

 Simon


 On Wed, Dec 10, 2014 at 2:25 AM, Klausen Schaefersinho 
 klaus.schaef...@gmail.com wrote:

 Hi,


 I would like to use Spark to train a model, but use the model in some
 other place,, e.g. a servelt to do some classification in real time.

 What is the best way to do this? Can I just copy I model file or
 something and load it in the servelt? Can anybody point me to a good
 tutorial?


 Cheers,


 Klaus



 --
 “Overfitting” is not about an excessive amount of physical exercise...






Re: Which function in spark is used to combine two RDDs by keys

2014-11-13 Thread Sonal Goyal
Check cogroup.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Thu, Nov 13, 2014 at 5:11 PM, Blind Faith person.of.b...@gmail.com
wrote:

 Let us say I have the following two RDDs, with the following key-pair
 values.

 rdd1 = [ (key1, [value1, value2]), (key2, [value3, value4]) ]

 and

 rdd2 = [ (key1, [value5, value6]), (key2, [value7]) ]

 Now, I want to join them by key values, so for example I want to return
 the following

 ret = [ (key1, [value1, value2, value5, value6]), (key2, [value3,
 value4, value7]) ]

 How I can I do this, in spark using python or scala? One way is to use
 join, but join would create a tuple inside the tuple. But I want to only
 have one tuple per key value pair.



Re: Confused why I'm losing workers/executors when writing a large file to S3

2014-11-13 Thread Sonal Goyal
Hi Darin,

In our case, we were getting the error gue to long GC pauses in our app.
Fixing the underlying code helped us remove this error. This is also
mentioned as point 1 in the link below:

http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/%3cca+-p3ah5aamgtke6viycwb24ohsnmaqm1q9x53abwb_arvw...@mail.gmail.com%3E


Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, Nov 14, 2014 at 1:01 AM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 For one of my Spark jobs, my workers/executors are dying and leaving the
 cluster.

 On the master, I see something like the following in the log file.  I'm
 surprised to see the '60' seconds in the master log below because I
 explicitly set it to '600' (or so I thought) in my spark job (see below).
 This is happening at the end of my job when I'm trying to persist a large
 RDD (probably around 300+GB) back to S3 (in 256 partitions).  My cluster
 consists of 6 r3.8xlarge machines.  The job successfully works when I'm
 outputting 100GB or 200GB.

 If  you have any thoughts/insights, it would be appreciated.

 Thanks.

 Darin.

 Here is where I'm setting the 'timeout' in my spark job.

 SparkConf conf = new SparkConf()
 .setAppName(SparkSync Application)
 .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
 .set(spark.rdd.compress,true)
 .set(spark.core.connection.ack.wait.timeout,600);
 ​
 On the master, I see the following in the log file.

 4/11/13 17:20:39 WARN master.Master: Removing
 worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 because we got no
 heartbeat in 60 seconds
 14/11/13 17:20:39 INFO master.Master: Removing worker
 worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 on
 ip-10-35-184-232.ec2.internal:51877
 14/11/13 17:20:39 INFO master.Master: Telling app of lost executor: 2

 On a worker, I see something like the following in the log file.

 14/11/13 17:20:58 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
   at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
   at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
   at scala.concurrent.Await$.result(package.scala:107)
   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)
   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362)
 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: I/O exception
 (java.net.SocketException) caught when processing request: Broken pipe
 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:32 INFO httpclient.HttpMethodDirector: I/O exception
 (java.net.SocketException) caught when processing request: Broken pipe
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 times, which
 exceeds the maximum retry count of 5
 14/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 

Re: Best practice for multi-user web controller in front of Spark

2014-11-11 Thread Sonal Goyal
I believe the Spark Job Server by Ooyala can help you share data across
multiple jobs, take a look at
http://engineering.ooyala.com/blog/open-sourcing-our-spark-job-server. It
seems to fit closely to what you need.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Tue, Nov 11, 2014 at 7:20 PM, bethesda swearinge...@mac.com wrote:

 We are relatively new to spark and so far have been manually submitting
 single jobs at a time for ML training, during our development process,
 using
 spark-submit.  Each job accepts a small user-submitted data set and
 compares
 it to every data set in our hdfs corpus, which only changes incrementally
 on
 a daily basis.  (that detail is relevant to question 3 below)

 Now we are ready to start building out the front-end, which will allow a
 team of data scientists to submit their problems to the system via a web
 front-end (web tier will be java).  Users could of course be submitting
 jobs
 more or less simultaneously.  We want to make sure we understand how to
 best
 structure this.

 Questions:

 1 - Does a new SparkContext get created in the web tier for each new
 request
 for processing?

 2 - If so, how much time should we expect it to take for setting up the
 context?  Our goal is to return a response to the users in under 10
 seconds,
 but if it takes many seconds to create a new context or otherwise set up
 the
 job, then we need to adjust our expectations for what is possible.  From
 using spark-shell one might conclude that it might take more than 10
 seconds
 to create a context, however it's not clear how much of that is
 context-creation vs other things.

 3 - (This last question perhaps deserves a post in and of itself:) if every
 job is always comparing some little data structure to the same HDFS corpus
 of data, what is the best pattern to use to cache the RDD's from HDFS so
 they don't have to always be re-constituted from disk?  I.e. how can RDD's
 be shared from the context of one job to the context of subsequent jobs?
 Or does something like memcache have to be used?

 Thanks!
 David



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-multi-user-web-controller-in-front-of-Spark-tp18581.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Does spark works on multicore systems?

2014-11-09 Thread Sonal Goyal
Also, the level of parallelism would be affected by how big your input is.
Could this be a problem in your  case?

On Sunday, November 9, 2014, Aaron Davidson ilike...@gmail.com wrote:

 oops, meant to cc userlist too

 On Sat, Nov 8, 2014 at 3:13 PM, Aaron Davidson ilike...@gmail.com
 javascript:_e(%7B%7D,'cvml','ilike...@gmail.com'); wrote:

 The default local master is local[*], which should use all cores on
 your system. So you should be able to just do ./bin/pyspark and
 sc.parallelize(range(1000)).count() and see that all your cores were used.

 On Sat, Nov 8, 2014 at 2:20 PM, Blind Faith person.of.b...@gmail.com
 javascript:_e(%7B%7D,'cvml','person.of.b...@gmail.com'); wrote:

 I am a Spark newbie and I use python (pyspark). I am trying to run a
 program on a 64 core system, but no matter what I do, it always uses 1
 core. It doesn't matter if I run it using spark-submit --master local[64]
 run.sh or I call x.repartition(64) in my code with an RDD, the spark
 program always uses one core. Has anyone experience of running spark
 programs on multicore processors with success? Can someone provide me a
 very simple example that does properly run on all cores of a multicore
 system?





-- 
Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal


Re: Submiting Spark application through code

2014-10-31 Thread Sonal Goyal
What do your worker logs say?

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, Oct 31, 2014 at 11:44 AM, sivarani whitefeathers...@gmail.com
wrote:

 I tried running it but dint work

 public static final SparkConf batchConf= new SparkConf();
 String master = spark://sivarani:7077;
 String spark_home =/home/sivarani/spark-1.0.2-bin-hadoop2/;
 String jar = /home/sivarani/build/Test.jar;
 public static final JavaSparkContext batchSparkContext = new
 JavaSparkContext(master,SparkTest,spark_home,new String[] {jar});

 public static void main(String args[]){
 runSpark(0,TestSubmit);}

 public static void runSpark(int crit, String dataFile){
 JavaRDDString logData = batchSparkContext.textFile(input, 10);
 flatMap
  maptoparr
 reduceByKey
 ListTuple2lt;String, Integer output1 = counts.collect();
 }


 This works fine with spark-submit but when i tried to submit through code
 LeadBatchProcessing.runSpark(0, TestSubmit.csv);

 I get this following error

 HTTP Status 500 - javax.servlet.ServletException:
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0:0 failed 4 times, most recent failure: TID 29 on host 172.18.152.36
 failed for unknown reason
 Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent
 failure: TID 29 on host 172.18.152.36 failed for unknown reason Driver
 stacktrace:



 Any Advice on this?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Submiting-Spark-application-through-code-tp17452p17797.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Using a Database to persist and load data from

2014-10-31 Thread Sonal Goyal
I think you can try to use the Hadoop DBOutputFormat

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, Oct 31, 2014 at 1:00 PM, Kamal Banga ka...@sigmoidanalytics.com
wrote:

 You can also use PairRDDFunctions' saveAsNewAPIHadoopFile that takes an
 OutputFormat class.
 So you will have to write a custom OutputFormat class that extends
 OutputFormat. In this class, you will have to implement a getRecordWriter
 which returns a custom RecordWriter.
 So you will also have to write a custom RecordWriter which extends
 RecordWriter which will have a write method that actually writes to the DB.

 On Fri, Oct 31, 2014 at 11:25 AM, Yanbo Liang yanboha...@gmail.com
 wrote:

 AFAIK, you can read data from DB with JdbcRDD, but there is no interface
 for writing to DB.
 JdbcRDD has some restrict such as  SQL must with where clause.
 For writing to DB, you can use mapPartitions or foreachPartition to
 implement.
 You can refer this example:

 http://stackoverflow.com/questions/24916852/how-can-i-connect-to-a-postgresql-database-into-apache-spark-using-scala

 2014-10-30 23:01 GMT+08:00 Asaf Lahav asaf.la...@gmail.com:

 Hi Ladies and Gents,
 I would like to know what are the options I have if I would like to
 leverage Spark code I already have written to use a DB (Vertica) as its
 store/datasource.
 The data is of tabular nature. So any relational DB can essentially be
 used.

 Do I need to develop a context? If yes, how? where can I get a good
 example?


 Thank you,
 Asaf






Re: A Spark Design Problem

2014-10-31 Thread Sonal Goyal
Does the following help?

JavaPairRDDbin,key join with JavaPairRDDbin,lock

If you partition both RDDs by the bin id, I think you should be able to get
what you want.

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, Oct 31, 2014 at 11:19 PM, francois.garil...@typesafe.com wrote:

 Hi Steve,

 Are you talking about sequence alignment ?

 —
 FG


 On Fri, Oct 31, 2014 at 5:44 PM, Steve Lewis lordjoe2...@gmail.com
 wrote:


  The original problem is in biology but the following captures the CS
 issues, Assume I  have a large number of locks and a large number of keys.
 There is a scoring function between keys and locks and a key that  fits a
 lock will have a high score. There may be many keys fitting one lock and a
 key may fit no locks well. The object is to find the best fitting lock for
 each key.

 Assume that the number of keys and locks is high enough that taking the
 cartesian product of the two is computationally impractical. Also assume
 that keys and locks have an attached location which is accurate within an
 error (say 1 Km). Only keys and locks within 1 Km need be compared.
 Now assume I can create a JavaRDDKeys and a JavaRDDLocks . I could
 divide the locations into 1 Km squared bins and look only within a few
 bins. Assume that it is practical to take a cartesian product for all
 elements in a bin but not to keep all elements in memory. I could map my
 RDDs into PairRDDs where the key is the bin assigned by location

 I know how to take the cartesian product of two JavaRDDs but not how to
 take a cartesian product of sets of elements sharing a common key (bin),
 Any suggestions. Assume that in the worst cases the number of elements in a
 bin are too large to keep in memory although if a bin were subdivided into,
 say 100 subbins elements would fit in memory.

 Any thoughts as to how to attack the problem





Re: LinearRegression and model prediction threshold

2014-10-31 Thread Sonal Goyal
You can serialize the model to a local/hdfs file system and use it later
when you want.

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Sat, Nov 1, 2014 at 12:02 AM, Sean Owen so...@cloudera.com wrote:

 It sounds like you are asking about logistic regression, not linear
 regression. If so, yes that's just what it does. The default would be
 0.5 in logistic regression. If you 'clear' the threshold you get the
 raw margin out of this and other linear classifiers.

 On Fri, Oct 31, 2014 at 7:18 PM, Sameer Tilak ssti...@live.com wrote:
  Hi All,
 
  I am using LinearRegression and have a question about the details on
  model.predict method. Basically it is predicting variable y given an
 input
  vector x. However, can someone point me to the documentation about what
 is
  the threshold used in the predict method? Can that be changed ? I am
  assuming that i/p vector essentially gets mapped to a number and is
 compared
  against a threshold value and then y is either set to 0 or 1 based on
 those
  two numbers.
 
  Another question I have is if I want to save the model to hdfs for later
  reuse is there a recommended way for doing that?
 
  // Building the model
  val numIterations = 100
  val model = LinearRegressionWithSGD.train(parsedData, numIterations)
 
  // Evaluate model on training examples and compute training error
  val valuesAndPreds = parsedData.map { point =
val prediction = model.predict(point.features)
(point.label, prediction)
  }

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?

2014-10-30 Thread Sonal Goyal
Hey Sameer,

Wouldnt local[x] run count parallelly in each of the x threads?

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Thu, Oct 30, 2014 at 11:42 PM, Sameer Farooqui same...@databricks.com
wrote:

 Hi Shahab,

 Are you running Spark in Local, Standalone, YARN or Mesos mode?

 If you're running in Standalone/YARN/Mesos, then the .count() action is
 indeed automatically parallelized across multiple Executors.

 When you run a .count() on an RDD, it is actually distributing tasks to
 different executors to each do a local count on a local partition and then
 all the tasks send their sub-counts back to the driver for final
 aggregation. This sounds like the kind of behavior you're looking for.

 However, in Local mode, everything runs in a single JVM (the driver +
 executor), so there's no parallelization across Executors.



 On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 I noticed that the count (of RDD)  in many of my queries is the most
 time consuming one as it runs in the driver process rather then done by
 parallel worker nodes,

 Is there any way to perform count in parallel , at at least parallelize
  it as much as possible?

 best,
 /Shahab





Re: Java api overhead?

2014-10-29 Thread Sonal Goyal
Thanks Koert. These numbers indeed tie back to our data and algorithms.
Would going the scala route save some memory, as the java API creates
wrapper Tuple2 for all pair functions?

On Wednesday, October 29, 2014, Koert Kuipers ko...@tresata.com wrote:

 since spark holds data structures on heap (and by default tries to work
 with all data in memory) and its written in Scala seeing lots of scala
 Tuple2 is not unexpected. how do these numbers relate to your data size?
 On Oct 27, 2014 2:26 PM, Sonal Goyal sonalgoy...@gmail.com
 javascript:_e(%7B%7D,'cvml','sonalgoy...@gmail.com'); wrote:

 Hi,

 I wanted to understand what kind of memory overheads are expected if at
 all while using the Java API. My application seems to have a lot of live
 Tuple2 instances and I am hitting a lot of gc so I am wondering if I am
 doing something fundamentally wrong. Here is what the top of my heap looks
 like. I actually create reifier.tuple.Tuple objects and pass them to map
 methods and mostly return Tuple2Tuple,Tuple. The heap seems to have far
 too many Tuple2 and $colon$colon.


 num #instances #bytes  class name
 --
1:  85414872 2049956928
 scala.collection.immutable.$colon$colon
2:  85414852 2049956448  scala.Tuple2
3:304221   14765832  [C
4:3029237270152  java.lang.String
5: 441112624624  [Ljava.lang.Object;
6:  12101495256  [B
7: 39839 956136  java.util.ArrayList
8:29 950736
 [Lscala.concurrent.forkjoin.ForkJoinTask;
9:  8129 827792  java.lang.Class
   10: 33839 812136  java.lang.Long
   11: 33400 801600  reifier.tuple.Tuple
   12:  6116 538208  java.lang.reflect.Method
   13: 12767 408544
 java.util.concurrent.ConcurrentHashMap$Node
   14:  5994 383616  org.apache.spark.scheduler.ResultTask
   15: 10298 329536  java.util.HashMap$Node
   16: 11988 287712
 org.apache.spark.rdd.NarrowCoGroupSplitDep
   17:  5708 228320  reifier.block.Canopy
   18: 9 215784  [Lscala.collection.Seq;
   19: 12078 193248  java.lang.Integer
   20: 12019 192304  java.lang.Object
   21:  5708 182656  reifier.block.Tree
   22:  2776 173152  [I
   23:  6013 144312  scala.collection.mutable.ArrayBuffer
   24:  5994 143856
 [Lorg.apache.spark.rdd.CoGroupSplitDep;
   25:  5994 143856  org.apache.spark.rdd.CoGroupPartition
   26:  5994 143856
 org.apache.spark.rdd.ShuffledRDDPartition
   27:  4486 143552  java.util.Hashtable$Entry
   28:  6284 132800  [Ljava.lang.Class;
   29:  1819 130968  java.lang.reflect.Field
   30:   605 101208  [Ljava.util.HashMap$Node;



 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal




-- 
Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal


Java api overhead?

2014-10-27 Thread Sonal Goyal
Hi,

I wanted to understand what kind of memory overheads are expected if at all
while using the Java API. My application seems to have a lot of live Tuple2
instances and I am hitting a lot of gc so I am wondering if I am doing
something fundamentally wrong. Here is what the top of my heap looks like.
I actually create reifier.tuple.Tuple objects and pass them to map methods
and mostly return Tuple2Tuple,Tuple. The heap seems to have far too many
Tuple2 and $colon$colon.


num #instances #bytes  class name
--
   1:  85414872 2049956928  scala.collection.immutable.$colon$colon
   2:  85414852 2049956448  scala.Tuple2
   3:304221   14765832  [C
   4:3029237270152  java.lang.String
   5: 441112624624  [Ljava.lang.Object;
   6:  12101495256  [B
   7: 39839 956136  java.util.ArrayList
   8:29 950736
[Lscala.concurrent.forkjoin.ForkJoinTask;
   9:  8129 827792  java.lang.Class
  10: 33839 812136  java.lang.Long
  11: 33400 801600  reifier.tuple.Tuple
  12:  6116 538208  java.lang.reflect.Method
  13: 12767 408544
java.util.concurrent.ConcurrentHashMap$Node
  14:  5994 383616  org.apache.spark.scheduler.ResultTask
  15: 10298 329536  java.util.HashMap$Node
  16: 11988 287712
org.apache.spark.rdd.NarrowCoGroupSplitDep
  17:  5708 228320  reifier.block.Canopy
  18: 9 215784  [Lscala.collection.Seq;
  19: 12078 193248  java.lang.Integer
  20: 12019 192304  java.lang.Object
  21:  5708 182656  reifier.block.Tree
  22:  2776 173152  [I
  23:  6013 144312  scala.collection.mutable.ArrayBuffer
  24:  5994 143856  [Lorg.apache.spark.rdd.CoGroupSplitDep;
  25:  5994 143856  org.apache.spark.rdd.CoGroupPartition
  26:  5994 143856
org.apache.spark.rdd.ShuffledRDDPartition
  27:  4486 143552  java.util.Hashtable$Entry
  28:  6284 132800  [Ljava.lang.Class;
  29:  1819 130968  java.lang.reflect.Field
  30:   605 101208  [Ljava.util.HashMap$Node;



Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal


Re: Rdd of Rdds

2014-10-22 Thread Sonal Goyal
Another approach could be to create artificial keys for each RDD and
convert to PairRDDs. So your first RDD becomes
JavaPairRDDInt,String rdd1 with values 1,1 ; 1,2 and so on

Second RDD becomes rdd2 is 2, a; 2, b;2,c

You can union the two RDDs, groupByKey, countByKey etc and maybe achieve
what you are trying to do. Sorry this is just a hypothesis, as I am not
entirely sure about what you are trying to achieve. Ideally, I would think
hard whether multiple RDDs are indeed needed, just as Sean pointed out.

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Wed, Oct 22, 2014 at 8:35 PM, Sean Owen so...@cloudera.com wrote:

 No, there's no such thing as an RDD of RDDs in Spark.
 Here though, why not just operate on an RDD of Lists? or a List of RDDs?
 Usually one of these two is the right approach whenever you feel
 inclined to operate on an RDD of RDDs.

 On Wed, Oct 22, 2014 at 3:58 PM, Tomer Benyamini tomer@gmail.com
 wrote:
  Hello,
 
  I would like to parallelize my work on multiple RDDs I have. I wanted
  to know if spark can support a foreach on an RDD of RDDs. Here's a
  java example:
 
  public static void main(String[] args) {
 
  SparkConf sparkConf = new SparkConf().setAppName(testapp);
  sparkConf.setMaster(local);
 
  JavaSparkContext sc = new JavaSparkContext(sparkConf);
 
 
  ListString list = Arrays.asList(new String[] {1, 2, 3});
  JavaRDDString rdd = sc.parallelize(list);
 
  ListString list1 = Arrays.asList(new String[] {a, b, c});
 JavaRDDString rdd1 = sc.parallelize(list1);
 
  ListJavaRDDString rddList = new ArrayListJavaRDDString();
  rddList.add(rdd);
  rddList.add(rdd1);
 
 
  JavaRDDJavaRDDString rddOfRdds = sc.parallelize(rddList);
  System.out.println(rddOfRdds.count());
 
 
  rddOfRdds.foreach(new VoidFunctionJavaRDDString() {
 
 @Override
  public void call(JavaRDDString t) throws Exception {
   System.out.println(t.count());
  }
 
 });
  }
 
  From this code I'm getting a NullPointerException on the internal count
 method:
 
  Exception in thread main org.apache.spark.SparkException: Job
  aborted due to stage failure: Task 1.0:0 failed 1 times, most recent
  failure: Exception failure in TID 1 on host localhost:
  java.lang.NullPointerException
 
  org.apache.spark.rdd.RDD.count(RDD.scala:861)
 
 
  org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:365)
 
  org.apache.spark.api.java.JavaRDD.count(JavaRDD.scala:29)
 
  Help will be appreciated.
 
  Thanks,
  Tomer
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Join with large data set

2014-10-17 Thread Sonal Goyal
Hi Ankur,

If your rdds have common keys, you can look at partitioning both your
datasets using a custom partitioner based on keys so that you can avoid
shuffling and optimize join performance.

HTH

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, Oct 17, 2014 at 4:27 AM, Ankur Srivastava 
ankur.srivast...@gmail.com wrote:

 Hi,

 I have a rdd which is my application data and is huge. I want to join this
 with reference data which is also huge to fit in-memory and thus I do not
 want to use Broadcast variable.

 What other options do I have to perform such joins?

 I am using Cassandra as my data store, so should I just query cassandra to
 get the reference data needed?

 Also when I join two rdds, will it result in rdd scan or would spark do a
 hash partition on the two rdds to get the data with same keys on same node?

 Thanks
 Ankur



Re: key class requirement for PairedRDD ?

2014-10-17 Thread Sonal Goyal
We use our custom classes which are Serializable and have well defined
hashcode and equals methods through the Java API. Whats the issue you are
getting?

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, Oct 17, 2014 at 12:28 PM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 Dear all,

 Is it possible to use any kind of object as key in a PairedRDD. When I use
 a case class  key, the groupByKey operation don't behave as I expected. I
 want to use a case class to avoid using a large tuple as it is easier to
 manipulate.


 Cheers,

 Jaonary



Re: Optimizing pairwise similarity computation or how to avoid RDD.cartesian operation ?

2014-10-17 Thread Sonal Goyal
Cartesian joins of large datasets are usually going to be slow. If there
is a way you can reduce the problem space to make sure you only join
subsets with each other, that may be helpful. Maybe if you explain your
problem in more detail, people on the list can come up with more
suggestions.

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Fri, Oct 17, 2014 at 4:13 PM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 Hi all,

 I need to compute a similiarity between elements of two large sets of high
 dimensional feature vector.
 Naively, I create all possible pair of vectors with
 * features1.cartesian(features2)* and then map the produced paired rdd
 with my similarity function.

 The problem is that the cartesian operation takes a lot times, more time
 that computing the similarity itself. If I save each of my  feature vector
 into disk, form a list of file name pair and compute the similarity by
 reading the files it runs significantly much faster.

 Any ideas will be helpful,

 Cheers,

 Jao






  1   2   >