Re: Spark Graphx with Database

2016-12-30 Thread Felix Cheung
You might want to check out GraphFrames - to load database data (as Spark 
DataFrame) and build graphs with them


https://github.com/graphframes/graphframes

_
From: balaji9058 mailto:kssb...@gmail.com>>
Sent: Monday, December 26, 2016 9:27 PM
Subject: Spark Graphx with Database
To: mailto:user@spark.apache.org>>


Hi All,

I would like to know about spark graphx execution/processing with
database.Yes, i understand spark graphx is in-memory processing but some
extent we can manage querying but would like to do much more complex query
or processing.Please suggest me the usecase or steps for the same.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Graphx-with-Database-tp28253.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: RDD Location

2016-12-30 Thread Fei Hu
It will be very appreciated if you can give more details about why runJob
function could not be called in getPreferredLocations()

In the NewHadoopRDD class and HadoopRDD class, they get the location
information from the inputSplit. But there may be an issue in NewHadoopRDD,
because it generates all of the inputSplits on the master node, which means
I can only use a single node to generate and filter the inputSplits even if
the number of inputSplits is huge. Will it be a performance bottleneck?

Thanks,
Fei





On Fri, Dec 30, 2016 at 10:41 PM, Sun Rui  wrote:

> You can’t call runJob inside getPreferredLocations().
> You can take a look at the source  code of HadoopRDD to help you implement 
> getPreferredLocations()
> appropriately.
>
> On Dec 31, 2016, at 09:48, Fei Hu  wrote:
>
> That is a good idea.
>
> I tried add the following code to get getPreferredLocations() function:
>
> val results: Array[Array[DataChunkPartition]] = context.runJob(
>   partitionsRDD, (context: TaskContext, partIter:
> Iterator[DataChunkPartition]) => partIter.toArray, dd, allowLocal = true)
>
> But it seems to be suspended when executing this function. But if I move
> the code to other places, like the main() function, it runs well.
>
> What is the reason for it?
>
> Thanks,
> Fei
>
> On Fri, Dec 30, 2016 at 2:38 AM, Sun Rui  wrote:
>
>> Maybe you can create your own subclass of RDD and override the
>> getPreferredLocations() to implement the logic of dynamic changing of the
>> locations.
>> > On Dec 30, 2016, at 12:06, Fei Hu  wrote:
>> >
>> > Dear all,
>> >
>> > Is there any way to change the host location for a certain partition of
>> RDD?
>> >
>> > "protected def getPreferredLocations(split: Partition)" can be used to
>> initialize the location, but how to change it after the initialization?
>> >
>> >
>> > Thanks,
>> > Fei
>> >
>> >
>>
>>
>>
>
>


Re: Difference in R and Spark Output

2016-12-30 Thread Felix Cheung
Could you elaborate more on the huge difference you are seeing?



From: Saroj C 
Sent: Friday, December 30, 2016 5:12:04 AM
To: User
Subject: Difference in R and Spark Output

Dear All,
 For the attached input file, there is a huge difference between the Clusters 
in R and Spark(ML). Any idea, what could be the difference ?

Note we wanted to create Five(5) clusters.

Please find the snippets in Spark and R

Spark

//Load the Data file

// Create K means Cluster
KMeans kmeans = new KMeans().setK(5).setMaxIter(500)

.setFeaturesCol("features").setPredictionCol("prediction");


In R

//Load the Data File into df

//Create the K Means Cluster

model <- kmeans(df, 5)



Thanks & Regards
Saroj

=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain
confidential or privileged information. If you are
not the intended recipient, any dissemination, use,
review, distribution, printing or copying of the
information contained in this e-mail message
and/or attachments to it are strictly prohibited. If
you have received this communication in error,
please notify us by reply e-mail or telephone and
immediately and permanently delete the message
and any attachments. Thank you


Re: launch spark on mesos within a docker container

2016-12-30 Thread Timothy Chen
It seems like it's getting offer decline calls, which seems like it's
getting the offer calls and was able to reply.

Can you turn on TRACE logging in Spark with the Mesos coarse grain
scheduler and see if it says if it is processing the offers?

Tim

On Fri, Dec 30, 2016 at 2:35 PM, Ji Yan  wrote:
> Thanks Timothy,
>
> Setting these four environment variables as you suggested has got the Spark
> running
>
> LIBPROCESS_ADVERTISE_IP=LIBPROCESS_ADVERTISE_PORT=40286
> LIBPROCESS_IP=0.0.0.0 LIBPROCESS_PORT=40286
>
> After that, it seems that Spark cannot accept any offer from mesos. If I run
> the same script outside the docker container, Spark can get resource and the
> Spark job runs successfully to end.
>
> Here is the mesos master log for running the Spark job inside the Docker
> container
>
> I1230 14:29:55.710973  9557 master.cpp:2500] Subscribing framework eval.py
> with checkpointing disabled and capabilities [ GPU_RESOURCES ]
>
> I1230 14:29:55.712379  9567 hierarchical.cpp:271] Added framework
> 993198d1-7393-4656-9f75-4f22702609d0-0251
>
> I1230 14:29:55.713717  9550 master.cpp:5709] Sending 1 offers to framework
> 993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
> scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286
>
> I1230 14:29:55.829774  9549 master.cpp:3951] Processing DECLINE call for
> offers: [ 993198d1-7393-4656-9f75-4f22702609d0-O1384 ] for framework
> 993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
> scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286
>
> I1230 14:30:01.055359  9569 http.cpp:381] HTTP GET for /master/state from
> 172.16.8.140:49406 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
> 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
> Safari/537.36'
>
> I1230 14:30:01.457598  9553 master.cpp:5709] Sending 1 offers to framework
> 993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
> scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286
>
> I1230 14:30:01.463732  9542 master.cpp:3951] Processing DECLINE call for
> offers: [ 993198d1-7393-4656-9f75-4f22702609d0-O1385 ] for framework
> 993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
> scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286
>
> I1230 14:30:02.300915  9562 http.cpp:381] HTTP GET for /master/state from
> 172.16.1.58:62629 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
> 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
> Safari/537.36'
>
> I1230 14:30:03.847647  9553 http.cpp:381] HTTP GET for /master/state from
> 172.16.8.140:49406 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
> 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
> Safari/537.36'
>
> I1230 14:30:04.431270  9551 http.cpp:381] HTTP GET for /master/state from
> 172.16.1.58:62629 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
> 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
> Safari/537.36'
>
> I1230 14:30:07.465801  9549 master.cpp:5709] Sending 1 offers to framework
> 993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
> scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286
>
> I1230 14:30:07.470860  9542 master.cpp:3951] Processing DECLINE call for
> offers: [ 993198d1-7393-4656-9f75-4f22702609d0-O1386 ] for framework
> 993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
> scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286
>
> I1230 14:30:11.077518  9572 http.cpp:381] HTTP GET for /master/state from
> 172.16.8.140:59764 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
> 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
> Safari/537.36'
>
> I1230 14:30:12.387562  9560 http.cpp:381] HTTP GET for /master/state from
> 172.16.1.58:62629 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
> 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
> Safari/537.36'
>
> I1230 14:30:12.473937  9572 master.cpp:5709] Sending 1 offers to framework
> 993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
> scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286
>
>
>
> On Fri, Dec 30, 2016 at 1:35 PM, Timothy Chen  wrote:
>>
>> Hi Ji,
>>
>> One way to make it fixed is to set LIBPROCESS_PORT environment variable on
>> the executor when it is launched.
>>
>> Tim
>>
>>
>> On Dec 30, 2016, at 1:23 PM, Ji Yan  wrote:
>>
>> Dear Spark Users,
>>
>> We are trying to launch Spark on Mesos from within a docker container. We
>> have found that since the Spark executors need to talk back at the Spark
>> driver, there is need to do a lot of port mapping to make that happen. We
>> seemed to have mapped the ports on what we could find from the documentation
>> page on spark configuration.
>>
>>> spark-2.1.0-bin-spark-2.1/bin/spark-submit \
>>>   --conf 'spark.driver.host'= \
>>>   --conf 'spark.blockManager.port'='40285' \
>>>   --conf 'spark.driver.bindAddress'='0.0.0.0' \
>>>   --conf 'spark.driver.port'='40284' \

[ML] Converting ml.DenseVector to mllib.Vector

2016-12-30 Thread Jason Wolosonovich

Hello All,

I'm working through the Data Science with Scala course on Big Data 
University and it is not updated to work with Spark 2.0, so I'm adapting 
the code as I work through it, however I've finally run into something 
that is over my head. I'm new to Scala as well.


When I run this code 
(https://gist.github.com/jmwoloso/a715cc4d7f1e7cc7951fab4edf6218b1) I 
get the following error:


`java.lang.ClassCastException: org.apache.spark.ml.linalg.DenseVector 
cannot be cast to org.apache.spark.mllib.linalg.Vector`


I believe this is occurring at line 107 of the gist above. The code 
starting at this line (and continuing to the end of the gist) is the 
current code in the course.


If I try to map to any other class type, then I have problems with the 
`Statistics.corr(rddVec)`.


How can I convert `rddVec` from an `ml.linalg.DenseVector` into an 
`mllib.linalg.Vector` for use with `Statistics`?


Thanks!

-Jason

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: RDD Location

2016-12-30 Thread Sun Rui
You can’t call runJob inside getPreferredLocations().
You can take a look at the source  code of HadoopRDD to help you implement 
getPreferredLocations() appropriately.
> On Dec 31, 2016, at 09:48, Fei Hu  wrote:
> 
> That is a good idea.
> 
> I tried add the following code to get getPreferredLocations() function:
> 
> val results: Array[Array[DataChunkPartition]] = context.runJob(
>   partitionsRDD, (context: TaskContext, partIter: 
> Iterator[DataChunkPartition]) => partIter.toArray, dd, allowLocal = true)
> 
> But it seems to be suspended when executing this function. But if I move the 
> code to other places, like the main() function, it runs well.
> 
> What is the reason for it?
> 
> Thanks,
> Fei
> 
> On Fri, Dec 30, 2016 at 2:38 AM, Sun Rui  > wrote:
> Maybe you can create your own subclass of RDD and override the 
> getPreferredLocations() to implement the logic of dynamic changing of the 
> locations.
> > On Dec 30, 2016, at 12:06, Fei Hu  > > wrote:
> >
> > Dear all,
> >
> > Is there any way to change the host location for a certain partition of RDD?
> >
> > "protected def getPreferredLocations(split: Partition)" can be used to 
> > initialize the location, but how to change it after the initialization?
> >
> >
> > Thanks,
> > Fei
> >
> >
> 
> 
> 



Re: How to load a big csv to dataframe in Spark 1.6

2016-12-30 Thread Raymond Xie
Thanks Felix, I will try it tomorrow

~~~sent from my cell phone, sorry if there is any typo

2016年12月30日 下午10:08,"Felix Cheung" 写道:

> Have you tried the spark-csv package?
>
> https://spark-packages.org/package/databricks/spark-csv
>
>
> --
> *From:* Raymond Xie 
> *Sent:* Friday, December 30, 2016 6:46:11 PM
> *To:* user@spark.apache.org
> *Subject:* How to load a big csv to dataframe in Spark 1.6
>
> Hello,
>
> I see there is usually this way to load a csv to dataframe:
>
> sqlContext = SQLContext(sc)
>
> Employee_rdd = sc.textFile("\..\Employee.csv")
>.map(lambda line: line.split(","))
>
> Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])
>
> Employee_df.show()
>
> However in my case my csv has 100+ fields, which means toDF() will be very
> lengthy.
>
> Can anyone tell me a practical method to load the data?
>
> Thank you very much.
>
>
> *Raymond*
>
>


Re: How to load a big csv to dataframe in Spark 1.6

2016-12-30 Thread Raymond Xie
yes, I believe there should be a better way to handle my case.

~~~sent from my cell phone, sorry if there is any typo

2016年12月30日 下午10:09,"write2sivakumar@gmail" 写道:

Hi Raymond,

Your problem is to pass those 100 fields to .toDF() method??



Sent from my Samsung device


 Original message 
From: Raymond Xie 
Date: 31/12/2016 10:46 (GMT+08:00)
To: user@spark.apache.org
Subject: How to load a big csv to dataframe in Spark 1.6

Hello,

I see there is usually this way to load a csv to dataframe:

sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
   .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()

However in my case my csv has 100+ fields, which means toDF() will be very
lengthy.

Can anyone tell me a practical method to load the data?

Thank you very much.


*Raymond*


Re: How to load a big csv to dataframe in Spark 1.6

2016-12-30 Thread theodondre


You can use the structtype and structfield approach or use the inferSchema 
approach.


Sent from my T-Mobile 4G LTE Device

 Original message 
From: "write2sivakumar@gmail"  
Date: 12/30/16  10:08 PM  (GMT-05:00) 
To: Raymond Xie , user@spark.apache.org 
Subject: Re: How to load a big csv to dataframe in Spark 1.6 



Hi Raymond,
Your problem is to pass those 100 fields to .toDF() method??


Sent from my Samsung device

 Original message 
From: Raymond Xie  
Date: 31/12/2016  10:46  (GMT+08:00) 
To: user@spark.apache.org 
Subject: How to load a big csv to dataframe in Spark 1.6 

Hello,
I see there is usually this way to load a csv to dataframe:
sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
   .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()However in my case my csv has 100+ fields, which means toDF() 
will be very lengthy.
Can anyone tell me a practical method to load the data?
Thank you very much.

Raymond







Re: How to load a big csv to dataframe in Spark 1.6

2016-12-30 Thread Felix Cheung
Have you tried the spark-csv package?

https://spark-packages.org/package/databricks/spark-csv



From: Raymond Xie 
Sent: Friday, December 30, 2016 6:46:11 PM
To: user@spark.apache.org
Subject: How to load a big csv to dataframe in Spark 1.6

Hello,

I see there is usually this way to load a csv to dataframe:


sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
   .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()

However in my case my csv has 100+ fields, which means toDF() will be very 
lengthy.

Can anyone tell me a practical method to load the data?

Thank you very much.


Raymond



Re: How to load a big csv to dataframe in Spark 1.6

2016-12-30 Thread write2sivakumar@gmail


Hi Raymond,
Your problem is to pass those 100 fields to .toDF() method??


Sent from my Samsung device

 Original message 
From: Raymond Xie  
Date: 31/12/2016  10:46  (GMT+08:00) 
To: user@spark.apache.org 
Subject: How to load a big csv to dataframe in Spark 1.6 

Hello,
I see there is usually this way to load a csv to dataframe:
sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
   .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()However in my case my csv has 100+ fields, which means toDF() 
will be very lengthy.
Can anyone tell me a practical method to load the data?
Thank you very much.

Raymond







How to load a big csv to dataframe in Spark 1.6

2016-12-30 Thread Raymond Xie
Hello,

I see there is usually this way to load a csv to dataframe:

sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
   .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()

However in my case my csv has 100+ fields, which means toDF() will be very
lengthy.

Can anyone tell me a practical method to load the data?

Thank you very much.


*Raymond*


Re: Dependency Injection and Microservice development with Spark

2016-12-30 Thread Muthu Jayakumar
Adding to Lars Albertsson & Miguel Morales, I am hoping to see how
well scalameta would branch down into support for macros that can rid away
sizable DI problems and for the reminder having a class type as args as Miguel
Morales mentioned.

Thanks,


On Wed, Dec 28, 2016 at 6:41 PM, Miguel Morales 
wrote:

> Hi
>
> Not sure about Spring boot but trying to use DI libraries you'll run into
> serialization issues.I've had luck using an old version of Scaldi.
> Recently though I've been passing the class types as arguments with default
> values.  Then in the spark code it gets instantiated.  So you're basically
> passing and serializing a class name.
>
> Sent from my iPhone
>
> > On Dec 28, 2016, at 1:55 PM, Lars Albertsson  wrote:
> >
> > Do you really need dependency injection?
> >
> > DI is often used for testing purposes. Data processing jobs are easy
> > to test without DI, however, due to their functional and synchronous
> > nature. Hence, DI is often unnecessary for testing data processing
> > jobs, whether they are batch or streaming jobs.
> >
> > Or do you want to use DI for other reasons?
> >
> >
> > Lars Albertsson
> > Data engineering consultant
> > www.mapflat.com
> > https://twitter.com/lalleal
> > +46 70 7687109
> > Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
> >
> >
> > On Fri, Dec 23, 2016 at 11:56 AM, Chetan Khatri
> >  wrote:
> >> Hello Community,
> >>
> >> Current approach I am using for Spark Job Development with Scala + SBT
> and
> >> Uber Jar with yml properties file to pass configuration parameters. But
> If i
> >> would like to use Dependency Injection and MicroService Development like
> >> Spring Boot feature in Scala then what would be the standard approach.
> >>
> >> Thanks
> >>
> >> Chetan
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


context.runJob() was suspended in getPreferredLocations() function

2016-12-30 Thread Fei Hu
Dear all,

I tried to customize my own RDD. In the getPreferredLocations() function, I
used the following code to query anonter RDD, which was used as an input to
initialize this customized RDD:

   * val results: Array[Array[DataChunkPartition]] =
context.runJob(partitionsRDD, (context: TaskContext, partIter:
Iterator[DataChunkPartition]) => partIter.toArray, partitions, allowLocal =
true)*

The problem is that when executing the above code, the task seemed to be
suspended. I mean the job just stopped at this code, but no errors and no
outputs.

What is the reason for it?

Thanks,
Fei


Re: RDD Location

2016-12-30 Thread Fei Hu
That is a good idea.

I tried add the following code to get getPreferredLocations() function:

val results: Array[Array[DataChunkPartition]] = context.runJob(
  partitionsRDD, (context: TaskContext, partIter:
Iterator[DataChunkPartition]) => partIter.toArray, dd, allowLocal = true)

But it seems to be suspended when executing this function. But if I move
the code to other places, like the main() function, it runs well.

What is the reason for it?

Thanks,
Fei

On Fri, Dec 30, 2016 at 2:38 AM, Sun Rui  wrote:

> Maybe you can create your own subclass of RDD and override the
> getPreferredLocations() to implement the logic of dynamic changing of the
> locations.
> > On Dec 30, 2016, at 12:06, Fei Hu  wrote:
> >
> > Dear all,
> >
> > Is there any way to change the host location for a certain partition of
> RDD?
> >
> > "protected def getPreferredLocations(split: Partition)" can be used to
> initialize the location, but how to change it after the initialization?
> >
> >
> > Thanks,
> > Fei
> >
> >
>
>
>


Re: Best way to process lookup ETL with Dataframes

2016-12-30 Thread Nicholas Hakobian
Yep, sequential joins is what I have done in the past with similar
requirements.

Splitting and merging DataFrames is most likely killing performance if you
do not cache the DataFrame pre-split. If you do, it will compute the
lineage prior to the cache statement once (at first invocation), then use
the cached result to perform the additional join, then union the results.
Without the cache, you are most likely computing the full lineage twice,
all the way back to the raw data import and having double the read I/O.

The most optimal path will most likely depend on the size of the tables you
are joining to. If both are small (compared to the primary data source) and
can be broadcasted, doing the sequential join will most likely be the
easiest and most efficient approach. If one (or both) of the tables you are
joining to are significantly large enough that they cannot be efficiently
broadcasted, going through the join / cache / split / second join / union
path is likely to be faster. It also depends on how much memory you can
dedicate to caching...the possibilities are endless.

I tend to approach this type of problem by weighing the cost of extra
development time for a more complex join vs the extra execution time vs
frequency of execution. For something that will execute daily (or more
frequently) the cost of more development to have faster execution time
(even if its only 2x faster) might be worth it.

It might also be worth investigating if a newer version of Spark (1.6 at
the least, or 2.0 if possible) is feasible to install. There are lots of
performance improvements in those versions, if you have the option of
upgrading.

-Nick

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Fri, Dec 30, 2016 at 3:35 PM, Sesterhenn, Mike 
wrote:

> Thanks Nicholas.  It looks like for some of my use cases, I might be able
> to use do sequential joins, and then use coalesce() (or in combination with
> withColumn(when()...)) to sort out the results.
>
>
> Splitting and merging dataframes seems to really kills my app
> performance.  I'm not sure if it's a spark 1.5 thing or what, but I just
> refactored one column to do one less split/merge, and it saved me almost
> half the time on my job.  But for some use cases I don't seem to be able to
> avoid them.  It is important in some cases to NOT do a join under certain
> conditions for a row because bad data will result.
>
>
> Any other thoughts?
> --
> *From:* Nicholas Hakobian 
> *Sent:* Friday, December 30, 2016 2:12:40 PM
> *To:* Sesterhenn, Mike
> *Cc:* ayan guha; user@spark.apache.org
>
> *Subject:* Re: Best way to process lookup ETL with Dataframes
>
> It looks like Spark 1.5 has the coalesce function, which is like NVL, but
> a bit more flexible. From Ayan's example you should be able to use:
> coalesce(b.col, c.col, 'some default')
>
> If that doesn't have the flexibility you want, you can always use nested
> case or if statements, but its just harder to read.
>
> Nicholas Szandor Hakobian, Ph.D.
> Senior Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com
>
>
>
> On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike 
> wrote:
>
>> Thanks, but is nvl() in Spark 1.5?  I can't find it in
>> spark.sql.functions (http://spark.apache.org/docs/
>> 1.5.0/api/scala/index.html#org.apache.spark.sql.functions$)
>>
>>
>> Reading about the Oracle nvl function, it seems it is similar to the na
>> functions.  Not sure it will help though, because what I need is to join
>> after the first join fails.
>>
>> --
>> *From:* ayan guha 
>> *Sent:* Thursday, December 29, 2016 11:06 PM
>> *To:* Sesterhenn, Mike
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Best way to process lookup ETL with Dataframes
>>
>> How about this -
>>
>> select a.*, nvl(b.col,nvl(c.col,'some default'))
>> from driving_table a
>> left outer join lookup1 b on a.id=b.id
>> left outer join lookup2 c on a.id=c,id
>>
>> ?
>>
>> On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike 
>> wrote:
>>
>>> Hi all,
>>>
>>>
>>> I'm writing an ETL process with Spark 1.5, and I was wondering the best
>>> way to do something.
>>>
>>>
>>> A lot of the fields I am processing require an algorithm similar to this:
>>>
>>>
>>> Join input dataframe to a lookup table.
>>>
>>> if (that lookup fails (the joined fields are null)) {
>>>
>>> Lookup into some other table to join some other fields.
>>>
>>> }
>>>
>>>
>>> With Dataframes, it seems the only way to do this is to do something
>>> like this:
>>>
>>>
>>> Join input dataframe to a lookup table.
>>>
>>> if (that lookup fails (the joined fields are null)) {
>>>
>>>*SPLIT the dataframe into two DFs via DataFrame.filter(),
>>>
>>>   one group with successful lookup, the other failed).*
>>>
>>>For failed lookup:  {
>>>
>>>Lookup into some other table to grab some other fields.
>>>
>>>}
>>>
>>>*MERGE the dataframe splits back together via Da

Re: Best way to process lookup ETL with Dataframes

2016-12-30 Thread Sesterhenn, Mike
Thanks Nicholas.  It looks like for some of my use cases, I might be able to 
use do sequential joins, and then use coalesce() (or in combination with 
withColumn(when()...)) to sort out the results.


Splitting and merging dataframes seems to really kills my app performance.  I'm 
not sure if it's a spark 1.5 thing or what, but I just refactored one column to 
do one less split/merge, and it saved me almost half the time on my job.  But 
for some use cases I don't seem to be able to avoid them.  It is important in 
some cases to NOT do a join under certain conditions for a row because bad data 
will result.


Any other thoughts?


From: Nicholas Hakobian 
Sent: Friday, December 30, 2016 2:12:40 PM
To: Sesterhenn, Mike
Cc: ayan guha; user@spark.apache.org
Subject: Re: Best way to process lookup ETL with Dataframes

It looks like Spark 1.5 has the coalesce function, which is like NVL, but a bit 
more flexible. From Ayan's example you should be able to use:
coalesce(b.col, c.col, 'some default')

If that doesn't have the flexibility you want, you can always use nested case 
or if statements, but its just harder to read.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com



On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Thanks, but is nvl() in Spark 1.5?  I can't find it in spark.sql.functions 
(http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$)


Reading about the Oracle nvl function, it seems it is similar to the na 
functions.  Not sure it will help though, because what I need is to join after 
the first join fails.


From: ayan guha mailto:guha.a...@gmail.com>>
Sent: Thursday, December 29, 2016 11:06 PM
To: Sesterhenn, Mike
Cc: user@spark.apache.org
Subject: Re: Best way to process lookup ETL with Dataframes

How about this -

select a.*, nvl(b.col,nvl(c.col,'some default'))
from driving_table a
left outer join lookup1 b on a.id=b.id
left outer join lookup2 c on a.id=c,id

?

On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Hi all,


I'm writing an ETL process with Spark 1.5, and I was wondering the best way to 
do something.


A lot of the fields I am processing require an algorithm similar to this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

Lookup into some other table to join some other fields.

}


With Dataframes, it seems the only way to do this is to do something like this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

   *SPLIT the dataframe into two DFs via DataFrame.filter(),

  one group with successful lookup, the other failed).*

   For failed lookup:  {

   Lookup into some other table to grab some other fields.

   }

   *MERGE the dataframe splits back together via DataFrame.unionAll().*

}


I'm seeing some really large execution plans as you might imagine in the Spark 
Ui, and the processing time seems way out of proportion with the size of the 
dataset.  (~250GB in 9 hours).


Is this the best approach to implement an algorithm like this?  Note also that 
some fields I am implementing require multiple staged split/merge steps due to 
cascading lookup joins.


Thanks,


Michael Sesterhenn

msesterh...@cars.com




--
Best Regards,
Ayan Guha



Re: launch spark on mesos within a docker container

2016-12-30 Thread Ji Yan
Thanks Timothy,

Setting these four environment variables as you suggested has got the Spark
running

LIBPROCESS_ADVERTISE_IP=LIBPROCESS_ADVERTISE_PORT=40286
LIBPROCESS_IP=0.0.0.0 LIBPROCESS_PORT=40286

After that, it seems that Spark cannot accept any offer from mesos. If I
run the same script outside the docker container, Spark can get resource
and the Spark job runs successfully to end.

Here is the mesos master log for running the Spark job inside the Docker
container

I1230 14:29:55.710973  9557 master.cpp:2500] Subscribing framework eval.py
with checkpointing disabled and capabilities [ GPU_RESOURCES ]

I1230 14:29:55.712379  9567 hierarchical.cpp:271] Added framework
993198d1-7393-4656-9f75-4f22702609d0-0251

I1230 14:29:55.713717  9550 master.cpp:5709] Sending 1 offers to framework
993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286

I1230 14:29:55.829774  9549 master.cpp:3951] Processing DECLINE call for
offers: [ 993198d1-7393-4656-9f75-4f22702609d0-O1384 ] for framework
993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286

I1230 14:30:01.055359  9569 http.cpp:381] HTTP GET for /master/state from
172.16.8.140:49406 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
Safari/537.36'

I1230 14:30:01.457598  9553 master.cpp:5709] Sending 1 offers to framework
993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286

I1230 14:30:01.463732  9542 master.cpp:3951] Processing DECLINE call for
offers: [ 993198d1-7393-4656-9f75-4f22702609d0-O1385 ] for framework
993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286

I1230 14:30:02.300915  9562 http.cpp:381] HTTP GET for /master/state from
172.16.1.58:62629 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
Safari/537.36'

I1230 14:30:03.847647  9553 http.cpp:381] HTTP GET for /master/state from
172.16.8.140:49406 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
Safari/537.36'

I1230 14:30:04.431270  9551 http.cpp:381] HTTP GET for /master/state from
172.16.1.58:62629 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
Safari/537.36'

I1230 14:30:07.465801  9549 master.cpp:5709] Sending 1 offers to framework
993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286

I1230 14:30:07.470860  9542 master.cpp:3951] Processing DECLINE call for
offers: [ 993198d1-7393-4656-9f75-4f22702609d0-O1386 ] for framework
993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286

I1230 14:30:11.077518  9572 http.cpp:381] HTTP GET for /master/state from
172.16.8.140:59764 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
Safari/537.36'

I1230 14:30:12.387562  9560 http.cpp:381] HTTP GET for /master/state from
172.16.1.58:62629 with User-Agent='Mozilla/5.0 (Macintosh; Intel Mac OS X
10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95
Safari/537.36'

I1230 14:30:12.473937  9572 master.cpp:5709] Sending 1 offers to framework
993198d1-7393-4656-9f75-4f22702609d0-0251 (eval.py) at
scheduler-9300fd07-7cf5-4341-84c9-4f1930e8c145@172.16.1.101:40286


On Fri, Dec 30, 2016 at 1:35 PM, Timothy Chen  wrote:

> Hi Ji,
>
> One way to make it fixed is to set LIBPROCESS_PORT environment variable on
> the executor when it is launched.
>
> Tim
>
>
> On Dec 30, 2016, at 1:23 PM, Ji Yan  wrote:
>
> Dear Spark Users,
>
> We are trying to launch Spark on Mesos from within a docker container. We
> have found that since the Spark executors need to talk back at the Spark
> driver, there is need to do a lot of port mapping to make that happen. We
> seemed to have mapped the ports on what we could find from the
> documentation page on spark configuration.
>
> spark-2.1.0-bin-spark-2.1/bin/spark-submit \
>>   --conf 'spark.driver.host'= \
>>   --conf 'spark.blockManager.port'='40285' \
>>   --conf 'spark.driver.bindAddress'='0.0.0.0' \
>>   --conf 'spark.driver.port'='40284' \
>>   --conf 'spark.mesos.executor.docker.volumes'='spark-2.1.0-bin-
>> spark-2.1:/spark-2.1.0-bin-spark-2.1' \
>>   --conf 'spark.mesos.gpus.max'='2' \
>>   --conf 'spark.mesos.containerizer'='docker' \
>>   --conf 'spark.mesos.executor.docker.image'='docker.drive.ai/spark_
>> gpu_experiment:latest' \
>>   --master 'mesos://mesos_master_dev:5050' \
>>   -v eval.py
>
>
> When we launched Spark this way, from the Mesos master log. It seems that
> the mesos master is tryin

Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread Marco Mistroni
Hi Palash

so you have a pyspark application running on spark 2.0
You have python scripts dropping files on HDFS
then you have two spark job
- 1 load expected hour data (pls explain. HOw many files on average)
- 1 load delayed data(pls explain. how many files on average)

Do these scripts run continuously (they have a while loop) or you kick them
off  via a job scheduler on an hourly basis
Do these scripts run on a cluster?


So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3
of them, does aggregation etc then populate mongo
At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2
additonal. Presumably these files are not deleted). So your job now loads 5
files, does aggregation and store data in mongodb? Or does your job at T+1
only loads deltas (the two new csv files which appeared at T+1)?

You said before that simply parsing csv files via spark in a standalone app
works fine. Then what you can try is to do exactly the same processig you
are doing but instead of loading csv files from HDFS you can load from
local directory and see if the problem persists..(this just to exclude
any issues with loading HDFS data.)

hth
   Marco












On Fri, Dec 30, 2016 at 2:02 PM, Palash Gupta 
wrote:

> Hi Marco & Ayan,
>
> I have now clearer idea about what Marco means by Reduce. I will do it to
> dig down.
>
> Let me answer to your queries:
>
> hen you see the broadcast errors, does your job terminate?
> Palash>> Yes it terminated the app.
>
> Or are you assuming that something is wrong just because you see the
> message in the logs?
>
> Palash>> No it terminated for the very first step of Spark processing (in
> my case loading csv from hdfs)
>
> Plus...Wrt logicWho writes the CSV? With what frequency?
> Palash>> We parsed xml files using python (not in spark scope) & make csv
> and put in hdfs
>
> Does it app run all the time loading CSV from hadoop?
>
> Palash>> Every hour two separate pyspark app are running
> 1. Loading current expected hour data, prepare kpi, do aggregation, load
> in mongodb
> 2. Same operation will run for delayed hour data
>
>
> Are you using spark streaming?
> Palash>> No
>
> Does it app run fine with an older version of spark (1.6 )
> Palash>> I didn't test with Spark 1.6. My app is running now good as I
> stopped second app (delayed data loading) since last two days. Even most of
> the case both are running well except few times...
>
>
> Sent from Yahoo Mail on Android
> 
>
> On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni
>  wrote:
> Correct. I mean reduce the functionality.
> Uhm I realised I didn't ask u a fundamental question. When you see the
> broadcast errors, does your job terminate? Or are you assuming that
> something is wrong just because you see the message in the logs?
> Plus...Wrt logicWho writes the CSV? With what frequency?
> Does it app run all the time loading CSV from hadoop?
> Are you using spark streaming?
> Does it app run fine with an older version of spark (1.6 )
> Hth
>
> On 30 Dec 2016 12:44 pm, "ayan guha"  wrote:
>
>> @Palash: I think what Macro meant by "reduce functionality" is to reduce
>> scope of your application's functionality so that you can isolate the issue
>> in certain part(s) of the app...I do not think he meant "reduce" operation
>> :)
>>
>> On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta > invalid> wrote:
>>
>>> Hi Marco,
>>>
>>> All of your suggestions are highly appreciated, whatever you said so
>>> far. I would apply to implement in my code and let you know.
>>>
>>> Let me answer your query:
>>>
>>> What does your program do?
>>> Palash>> In each hour I am loading many CSV files and then I'm making
>>> some KPI(s) out of them. Finally I am doing some aggregation and inserting
>>> into mongodb from spark.
>>>
>>> you say it runs for 2-3 hours, what is the logic? just processing a huge
>>> amount of data? doing ML ?
>>> Palash>> Yes you are right whatever I'm processing it should not take
>>> much time. Initially my processing was taking only 5 minutes as I was using
>>> all cores running only one application. When I created more separate spark
>>> applications for handling delayed data loading and implementing more use
>>> cases with parallel run, I started facing the error randomly. And due to
>>> separate resource distribution among four parallel spark application to run
>>> in parallel now some task is taking longer time than usual. But still it
>>> should not take 2-3 hours time...
>>>
>>> Currently whole applications are running in a development environment
>>> where we have only two VM cluster and I will migrate to production platform
>>> by next week. I will let you know if there is any improvement over there.
>>>
>>> I'd say break down your application..  reduce functionality , run and
>>> see outcome. then add more functionality, run and see again.
>>>
>>> Palash>> Macro as I'm not very good in Spark. It would be helpful for me
>>> if 

Re: launch spark on mesos within a docker container

2016-12-30 Thread Timothy Chen
Hi Ji,

One way to make it fixed is to set LIBPROCESS_PORT environment variable on the 
executor when it is launched.

Tim


> On Dec 30, 2016, at 1:23 PM, Ji Yan  wrote:
> 
> Dear Spark Users,
> 
> We are trying to launch Spark on Mesos from within a docker container. We 
> have found that since the Spark executors need to talk back at the Spark 
> driver, there is need to do a lot of port mapping to make that happen. We 
> seemed to have mapped the ports on what we could find from the documentation 
> page on spark configuration.
> 
>> spark-2.1.0-bin-spark-2.1/bin/spark-submit \
>>   --conf 'spark.driver.host'= \
>>   --conf 'spark.blockManager.port'='40285' \
>>   --conf 'spark.driver.bindAddress'='0.0.0.0' \
>>   --conf 'spark.driver.port'='40284' \
>>   --conf 
>> 'spark.mesos.executor.docker.volumes'='spark-2.1.0-bin-spark-2.1:/spark-2.1.0-bin-spark-2.1'
>>  \
>>   --conf 'spark.mesos.gpus.max'='2' \
>>   --conf 'spark.mesos.containerizer'='docker' \
>>   --conf 
>> 'spark.mesos.executor.docker.image'='docker.drive.ai/spark_gpu_experiment:latest'
>>  \
>>   --master 'mesos://mesos_master_dev:5050' \
>>   -v eval.py
> 
> When we launched Spark this way, from the Mesos master log. It seems that the 
> mesos master is trying to make the offer back to the framework at port 33978 
> which turns out to be a dynamic port. The job failed at this point because it 
> looks like that the offer cannot reach back to the container. In order to 
> expose that port in the container, we'll need to make it fixed first, does 
> anyone know how to make that port fixed in spark configuration? Any other 
> advice on how to launch Spark on mesos from within docker container is 
> greatly appreciated
> 
> I1230 12:53:54.758297  9571 master.cpp:2424] Received SUBSCRIBE call for 
> framework 'eval.py' at 
> scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
> I1230 12:53:54.758608  9571 master.cpp:2500] Subscribing framework eval.py 
> with checkpointing disabled and capabilities [ GPU_RESOURCES ]
> I1230 12:53:54.760036  9569 hierarchical.cpp:271] Added framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233
> I1230 12:53:54.761533  9549 master.cpp:5709] Sending 1 offers to framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at 
> scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@:33978
> E1230 12:53:57.757814  9573 process.cpp:2105] Failed to shutdown socket with 
> fd 22: Transport endpoint is not connected
> I1230 12:53:57.758314  9543 master.cpp:1284] Framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at 
> scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978 disconnected
> I1230 12:53:57.758378  9543 master.cpp:2725] Disconnecting framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at 
> scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
> I1230 12:53:57.758411  9543 master.cpp:2749] Deactivating framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at 
> scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
> I1230 12:53:57.758582  9548 hierarchical.cpp:382] Deactivated framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233
> W1230 12:53:57.758915  9543 master.hpp:2113] Master attempted to send message 
> to disconnected framework 993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) 
> at scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
> I1230 12:53:57.759140  9543 master.cpp:1297] Giving framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at 
> scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978 0ns to 
> failover
> I1230 12:53:57.760573  9561 master.cpp:5561] Framework failover timeout, 
> removing framework 993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at 
> scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
> I1230 12:53:57.760648  9561 master.cpp:6296] Removing framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at 
> scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
> I1230 12:53:57.761493  9571 hierarchical.cpp:333] Removed framework 
> 993198d1-7393-4656-9f75-4f22702609d0-0233
> 
> The information in this email is confidential and may be legally privileged. 
> It is intended solely for the addressee. Access to this email by anyone else 
> is unauthorized. If you are not the intended recipient, any disclosure, 
> copying, distribution or any action taken or omitted to be taken in reliance 
> on it, is prohibited and may be unlawful.


launch spark on mesos within a docker container

2016-12-30 Thread Ji Yan
Dear Spark Users,

We are trying to launch Spark on Mesos from within a docker container. We
have found that since the Spark executors need to talk back at the Spark
driver, there is need to do a lot of port mapping to make that happen. We
seemed to have mapped the ports on what we could find from the
documentation page on spark configuration.

spark-2.1.0-bin-spark-2.1/bin/spark-submit \
>   --conf 'spark.driver.host'= \
>   --conf 'spark.blockManager.port'='40285' \
>   --conf 'spark.driver.bindAddress'='0.0.0.0' \
>   --conf 'spark.driver.port'='40284' \
>   --conf 'spark.mesos.executor.docker.volumes'='
> spark-2.1.0-bin-spark-2.1:/spark-2.1.0-bin-spark-2.1' \
>   --conf 'spark.mesos.gpus.max'='2' \
>   --conf 'spark.mesos.containerizer'='docker' \
>   --conf 'spark.mesos.executor.docker.image'='
> docker.drive.ai/spark_gpu_experiment:latest' \
>   --master 'mesos://mesos_master_dev:5050' \
>   -v eval.py


When we launched Spark this way, from the Mesos master log. It seems that
the mesos master is trying to make the offer back to the framework at port
33978 which turns out to be a dynamic port. The job failed at this point
because it looks like that the offer cannot reach back to the container. In
order to expose that port in the container, we'll need to make it fixed
first, does anyone know how to make that port fixed in spark configuration?
Any other advice on how to launch Spark on mesos from within docker
container is greatly appreciated

I1230 12:53:54.758297  9571 master.cpp:2424] Received SUBSCRIBE call
for framework 'eval.py' at
scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
I1230 12:53:54.758608  9571 master.cpp:2500] Subscribing framework
eval.py with checkpointing disabled and capabilities [ GPU_RESOURCES ]
I1230 12:53:54.760036  9569 hierarchical.cpp:271] Added framework
993198d1-7393-4656-9f75-4f22702609d0-0233I1230 12:53:54.761533  9549
master.cpp:5709] Sending 1 offers to framework
993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at
scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@:33978
E1230 12:53:57.757814  9573 process.cpp:2105] Failed to shutdown
socket with fd 22: Transport endpoint is not connectedI1230
12:53:57.758314  9543 master.cpp:1284] Framework
993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at
scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
disconnected
I1230 12:53:57.758378  9543 master.cpp:2725] Disconnecting framework
993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at
scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
I1230 12:53:57.758411  9543 master.cpp:2749] Deactivating framework
993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at
scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
I1230 12:53:57.758582  9548 hierarchical.cpp:382] Deactivated
framework 993198d1-7393-4656-9f75-4f22702609d0-0233
W1230 12:53:57.758915  9543 master.hpp:2113] Master attempted to send
message to disconnected framework
993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at
scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
I1230 12:53:57.759140  9543 master.cpp:1297] Giving framework
993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at
scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978 0ns
to failover
I1230 12:53:57.760573  9561 master.cpp:5561] Framework failover
timeout, removing framework 993198d1-7393-4656-9f75-4f22702609d0-0233
(eval.py) at scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
I1230 12:53:57.760648  9561 master.cpp:6296] Removing framework
993198d1-7393-4656-9f75-4f22702609d0-0233 (eval.py) at
scheduler-8a94bc86-c2b3-4c7d-bee7-cfddc8e9a8da@172.17.0.12:33978
I1230 12:53:57.761493  9571 hierarchical.cpp:333] Removed framework
993198d1-7393-4656-9f75-4f22702609d0-0233

-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.


Re: What's the best practice to load data from RDMS to Spark

2016-12-30 Thread Palash Gupta
Hi,
If you want to load from csv, you can use below procedure. Of course you need 
to define spark context first. (Given example to load all csv under a folder, 
you can use specific name for single file)
// these lines are equivalent in Spark 2.0
spark.read.format("csv").option("header", "true").load("../Downloads/*.csv")
spark.read.option("header", "true").csv("../Downloads/*.csv")
Best regardsPalash Gupta
Sent from Yahoo Mail on Android 
 
  On Fri, 30 Dec, 2016 at 11:39 pm, Raymond Xie wrote:   
Hello,
I am new to Spark, as a SQL developer, I only took some courses online and 
spent some time myself, never had a chance working on a real project.
I wonder what would be the best practice (tool, procedure...) to load data 
(csv, excel) into Spark platform?
Thank you.


Raymond  


Re: Best way to process lookup ETL with Dataframes

2016-12-30 Thread Nicholas Hakobian
It looks like Spark 1.5 has the coalesce function, which is like NVL, but a
bit more flexible. From Ayan's example you should be able to use:
coalesce(b.col, c.col, 'some default')

If that doesn't have the flexibility you want, you can always use nested
case or if statements, but its just harder to read.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com



On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike 
wrote:

> Thanks, but is nvl() in Spark 1.5?  I can't find it in spark.sql.functions
> (http://spark.apache.org/docs/1.5.0/api/scala/index.html#
> org.apache.spark.sql.functions$)
>
>
> Reading about the Oracle nvl function, it seems it is similar to the na
> functions.  Not sure it will help though, because what I need is to join
> after the first join fails.
>
> --
> *From:* ayan guha 
> *Sent:* Thursday, December 29, 2016 11:06 PM
> *To:* Sesterhenn, Mike
> *Cc:* user@spark.apache.org
> *Subject:* Re: Best way to process lookup ETL with Dataframes
>
> How about this -
>
> select a.*, nvl(b.col,nvl(c.col,'some default'))
> from driving_table a
> left outer join lookup1 b on a.id=b.id
> left outer join lookup2 c on a.id=c,id
>
> ?
>
> On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike 
> wrote:
>
>> Hi all,
>>
>>
>> I'm writing an ETL process with Spark 1.5, and I was wondering the best
>> way to do something.
>>
>>
>> A lot of the fields I am processing require an algorithm similar to this:
>>
>>
>> Join input dataframe to a lookup table.
>>
>> if (that lookup fails (the joined fields are null)) {
>>
>> Lookup into some other table to join some other fields.
>>
>> }
>>
>>
>> With Dataframes, it seems the only way to do this is to do something like
>> this:
>>
>>
>> Join input dataframe to a lookup table.
>>
>> if (that lookup fails (the joined fields are null)) {
>>
>>*SPLIT the dataframe into two DFs via DataFrame.filter(),
>>
>>   one group with successful lookup, the other failed).*
>>
>>For failed lookup:  {
>>
>>Lookup into some other table to grab some other fields.
>>
>>}
>>
>>*MERGE the dataframe splits back together via DataFrame.unionAll().*
>> }
>>
>>
>> I'm seeing some really large execution plans as you might imagine in the
>> Spark Ui, and the processing time seems way out of proportion with the size
>> of the dataset.  (~250GB in 9 hours).
>>
>>
>> Is this the best approach to implement an algorithm like this?  Note also
>> that some fields I am implementing require multiple staged split/merge
>> steps due to cascading lookup joins.
>>
>>
>> Thanks,
>>
>>
>> *Michael Sesterhenn*
>>
>>
>> *msesterh...@cars.com  *
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


What's the best practice to load data from RDMS to Spark

2016-12-30 Thread Raymond Xie
Hello,

I am new to Spark, as a SQL developer, I only took some courses online and
spent some time myself, never had a chance working on a real project.

I wonder what would be the best practice (tool, procedure...) to load data
(csv, excel) into Spark platform?

Thank you.



*Raymond*


Re: Spark/Mesos with GPU support

2016-12-30 Thread Stephen Boesch
Would it be possible to share that communication?  I am interested in this
thread.

2016-12-30 11:02 GMT-08:00 Ji Yan :

> Thanks Michael, Tim and I have touched base and thankfully the issue has
> already been resolved
>
> On Fri, Dec 30, 2016 at 9:20 AM, Michael Gummelt 
> wrote:
>
>> I've cc'd Tim and Kevin, who worked on GPU support.
>>
>> On Wed, Dec 28, 2016 at 11:22 AM, Ji Yan  wrote:
>>
>>> Dear Spark Users,
>>>
>>> Has anyone had successful experience running Spark on Mesos with GPU
>>> support? We have a Mesos cluster that can see and offer nvidia GPU
>>> resources. With Spark, it seems that the GPU support with Mesos (
>>> https://github.com/apache/spark/pull/14644) has only recently been
>>> merged into Spark Master which is not found in 2.0.2 release yet. We have a
>>> custom built Spark from 2.1-rc5 which is confirmed to have the above
>>> change. However when we try to run any code from Spark on this Mesos setup,
>>> the spark program hangs and keeps saying
>>>
>>> “WARN TaskSchedulerImpl: Initial job has not accepted any resources;
>>> check your cluster UI to ensure that workers are registered and have
>>> sufficient resources”
>>>
>>> We are pretty sure that the cluster has enough resources as there is
>>> nothing running on it. If we disable the GPU support in configuration and
>>> restart mesos and retry the same program, it would work.
>>>
>>> Any comment/advice on this greatly appreciated
>>>
>>> Thanks,
>>> Ji
>>>
>>>
>>> The information in this email is confidential and may be legally
>>> privileged. It is intended solely for the addressee. Access to this email
>>> by anyone else is unauthorized. If you are not the intended recipient, any
>>> disclosure, copying, distribution or any action taken or omitted to be
>>> taken in reliance on it, is prohibited and may be unlawful.
>>>
>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>>
>
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>


Re: Spark/Mesos with GPU support

2016-12-30 Thread Ji Yan
Thanks Michael, Tim and I have touched base and thankfully the issue has
already been resolved

On Fri, Dec 30, 2016 at 9:20 AM, Michael Gummelt 
wrote:

> I've cc'd Tim and Kevin, who worked on GPU support.
>
> On Wed, Dec 28, 2016 at 11:22 AM, Ji Yan  wrote:
>
>> Dear Spark Users,
>>
>> Has anyone had successful experience running Spark on Mesos with GPU
>> support? We have a Mesos cluster that can see and offer nvidia GPU
>> resources. With Spark, it seems that the GPU support with Mesos (
>> https://github.com/apache/spark/pull/14644) has only recently been
>> merged into Spark Master which is not found in 2.0.2 release yet. We have a
>> custom built Spark from 2.1-rc5 which is confirmed to have the above
>> change. However when we try to run any code from Spark on this Mesos setup,
>> the spark program hangs and keeps saying
>>
>> “WARN TaskSchedulerImpl: Initial job has not accepted any resources;
>> check your cluster UI to ensure that workers are registered and have
>> sufficient resources”
>>
>> We are pretty sure that the cluster has enough resources as there is
>> nothing running on it. If we disable the GPU support in configuration and
>> restart mesos and retry the same program, it would work.
>>
>> Any comment/advice on this greatly appreciated
>>
>> Thanks,
>> Ji
>>
>>
>> The information in this email is confidential and may be legally
>> privileged. It is intended solely for the addressee. Access to this email
>> by anyone else is unauthorized. If you are not the intended recipient, any
>> disclosure, copying, distribution or any action taken or omitted to be
>> taken in reliance on it, is prohibited and may be unlawful.
>>
>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>

-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.


Broadcast destroy

2016-12-30 Thread bryan.jeffrey
All,

If we are updating broadcast variables do we need to manually destroy the 
replaced broadcast, or will they be automatically pruned?

Thank you,

Bryan Jeffrey

Sent from my Windows 10 phone



Re: TallSkinnyQR

2016-12-30 Thread Sean Owen
There are no changes to Spark at all here. See my workaround below.

On Fri, Dec 30, 2016, 17:18 Iman Mohtashemi 
wrote:

> Hi guys,
> Are your changes/bug fixes reflected in the Spark 2.1 release?
> Iman
>
> On Dec 2, 2016 3:03 PM, "Iman Mohtashemi" 
> wrote:
>
> Thanks again! This is very helpful!
> Best regards,
> Iman
>
> On Dec 2, 2016 2:49 PM, "Huamin Li" <3eri...@gmail.com> wrote:
>
> Hi Iman,
>
> You can get my code from https://github.com/hl475/svd/tree/testSVD. In
> additional to fix the index issue for IndexedRowMatrix (
> https://issues.apache.org/jira/browse/SPARK-8614), I have made some the
> following changes as well:
>
> (1) Add tallSkinnySVD and computeSVDbyGram to indexedRowMatrix.
> (2) Add shuffle.scala to mllib/src/main/scala/org/apach
> e/spark/mllib/linalg/distributed/ (you need this if you want to use
> tallSkinnySVD). There was a bug about shuffle method in breeze, and I sent
> the pull request to https://github.com/scalanlp/breeze/pull/571. However,
> the pull request has been merged to breeze 0.13, whereas the version of
> breeze for current Spark is 0.12.
> (3) Add partialSVD to BlockMatrix which computes the randomized singular
> value decomposition of a given BlockMatrix.
>
> The new SVD methods (tallSkinnySVD, computeSVDbyGram, and partialSVD) are
> in beta version right now. You are totally welcome to test it and share the
> feedback with me!
>
> I implemented these codes for my summer intern project with Mark Tygert,
> and we are currently testing the performance of the new codes.
>
> Best,
> Huamin
>
> On Fri, Dec 2, 2016 at 2:07 PM, Iman Mohtashemi  > wrote:
>
> Great thanks! Where can I get the latest with the bug fixes?
> best regards,
> Iman
>
> On Fri, Dec 2, 2016 at 10:54 AM Huamin Li <3eri...@gmail.com> wrote:
>
> Hi,
>
> There seems to be a bug in the section of code that converts the RowMatrix
> format back into indexedRowMatrix format.
>
> For RowMatrix, I think the singular values and right singular vectors
> (not the left singular vectors U) that computeSVD computes are correct when
> using multiple executors/machines; Only the R (not the Q) in tallSkinnyQR
> is correct when using multiple executors/machines. U and Q were being
> stored in RowMatrix format. There is no index information about RowMatrix,
> so it does not make sense for U and Q.
>
> Others have run into this same problem (
> https://issues.apache.org/jira/browse/SPARK-8614)
>
> I think the quick solution for this problem is copy and paste the multiply,
> computeSVD, and tallSkinnyQR code from RowMatrix to IndexedRowMatrix and
> make the corresponding changes although this would result in code
> duplication.
>
> I have fixed the problem by what I mentioned above. Now, multiply,
> computeSVD, and tallSkinnyQR are giving the correct results for
> indexedRowMatrix when using multiple executors or workers. Let me know if
> I should do a pull request for this.
>
> Best,
> Huamin
>
> On Fri, Dec 2, 2016 at 11:23 AM, Iman Mohtashemi <
> iman.mohtash...@gmail.com> wrote:
>
> Ok thanks.
>
> On Fri, Dec 2, 2016 at 8:19 AM Sean Owen  wrote:
>
> I tried, but enforcing the ordering changed a fair bit of behavior and I
> gave up. I think the way to think of it is: a RowMatrix has whatever
> ordering you made it with, so you need to give it ordered rows if you're
> going to use a method like the QR decomposition. That works. I don't think
> the QR method should ever have been on this class though, for this reason.
>
> On Fri, Dec 2, 2016 at 4:13 PM Iman Mohtashemi 
> wrote:
>
> Hi guys,
> Was this bug ever resolved?
> Iman
>
> On Fri, Nov 11, 2016 at 9:59 AM Iman Mohtashemi 
> wrote:
>
> Yes this would be helpful, otherwise the Q part of the decomposition is
> useless. One can use that to solve the system by transposing it and
> multiplying with b and solving for x  (Ax = b) where A = R and b = Qt*b
> since the Upper triangular matrix is correctly available (R)
>
> On Fri, Nov 11, 2016 at 3:56 AM Sean Owen  wrote:
>
> @Xiangrui / @Joseph, do you think it would be reasonable to have
> CoordinateMatrix sort the rows it creates to make an IndexedRowMatrix? in
> order to make the ultimate output of toRowMatrix less surprising when it's
> not ordered?
>
>
> On Tue, Nov 8, 2016 at 3:29 PM Sean Owen  wrote:
>
> I think the problem here is that IndexedRowMatrix.toRowMatrix does *not*
> result in a RowMatrix with rows in order of their indices, necessarily:
>
>
> // Drop its row indices.
> RowMatrix rowMat = indexedRowMatrix.toRowMatrix();
>
> What you get is a matrix where the rows are arranged in whatever order
> they were passed to IndexedRowMatrix. RowMatrix says it's for rows where
> the ordering doesn't matter, but then it's maybe surprising it has a QR
> decomposition method, because clearly the result depends on the order of
> rows in the input. (CC Yuhao Yang for a comment?)
>
> You could say, well, why doesn't IndexedRowMatrix.toRowMatrix return at
> least something with sorted rows? that would not be hard. It 

Re: Spark/Mesos with GPU support

2016-12-30 Thread Michael Gummelt
I've cc'd Tim and Kevin, who worked on GPU support.

On Wed, Dec 28, 2016 at 11:22 AM, Ji Yan  wrote:

> Dear Spark Users,
>
> Has anyone had successful experience running Spark on Mesos with GPU
> support? We have a Mesos cluster that can see and offer nvidia GPU
> resources. With Spark, it seems that the GPU support with Mesos (
> https://github.com/apache/spark/pull/14644) has only recently been merged
> into Spark Master which is not found in 2.0.2 release yet. We have a custom
> built Spark from 2.1-rc5 which is confirmed to have the above change.
> However when we try to run any code from Spark on this Mesos setup, the
> spark program hangs and keeps saying
>
> “WARN TaskSchedulerImpl: Initial job has not accepted any resources;
> check your cluster UI to ensure that workers are registered and have
> sufficient resources”
>
> We are pretty sure that the cluster has enough resources as there is
> nothing running on it. If we disable the GPU support in configuration and
> restart mesos and retry the same program, it would work.
>
> Any comment/advice on this greatly appreciated
>
> Thanks,
> Ji
>
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: TallSkinnyQR

2016-12-30 Thread Iman Mohtashemi
Hi guys,
Are your changes/bug fixes reflected in the Spark 2.1 release?
Iman

On Dec 2, 2016 3:03 PM, "Iman Mohtashemi"  wrote:

> Thanks again! This is very helpful!
> Best regards,
> Iman
>
> On Dec 2, 2016 2:49 PM, "Huamin Li" <3eri...@gmail.com> wrote:
>
>> Hi Iman,
>>
>> You can get my code from https://github.com/hl475/svd/tree/testSVD. In
>> additional to fix the index issue for IndexedRowMatrix (
>> https://issues.apache.org/jira/browse/SPARK-8614), I have made some the
>> following changes as well:
>>
>> (1) Add tallSkinnySVD and computeSVDbyGram to indexedRowMatrix.
>> (2) Add shuffle.scala to mllib/src/main/scala/org/apach
>> e/spark/mllib/linalg/distributed/ (you need this if you want to use
>> tallSkinnySVD). There was a bug about shuffle method in breeze, and I sent
>> the pull request to https://github.com/scalanlp/breeze/pull/571.
>> However, the pull request has been merged to breeze 0.13, whereas the
>> version of breeze for current Spark is 0.12.
>> (3) Add partialSVD to BlockMatrix which computes the randomized singular
>> value decomposition of a given BlockMatrix.
>>
>> The new SVD methods (tallSkinnySVD, computeSVDbyGram, and partialSVD) are
>> in beta version right now. You are totally welcome to test it and share the
>> feedback with me!
>>
>> I implemented these codes for my summer intern project with Mark Tygert,
>> and we are currently testing the performance of the new codes.
>>
>> Best,
>> Huamin
>>
>> On Fri, Dec 2, 2016 at 2:07 PM, Iman Mohtashemi <
>> iman.mohtash...@gmail.com> wrote:
>>
>>> Great thanks! Where can I get the latest with the bug fixes?
>>> best regards,
>>> Iman
>>>
>>> On Fri, Dec 2, 2016 at 10:54 AM Huamin Li <3eri...@gmail.com> wrote:
>>>
 Hi,

 There seems to be a bug in the section of code that converts the
 RowMatrix format back into indexedRowMatrix format.

 For RowMatrix, I think the singular values and right singular vectors
 (not the left singular vectors U) that computeSVD computes are correct when
 using multiple executors/machines; Only the R (not the Q) in tallSkinnyQR
 is correct when using multiple executors/machines. U and Q were being
 stored in RowMatrix format. There is no index information about RowMatrix,
 so it does not make sense for U and Q.

 Others have run into this same problem (https://issues.apache.org/jir
 a/browse/SPARK-8614)

 I think the quick solution for this problem is copy and paste the multiply,
 computeSVD, and tallSkinnyQR code from RowMatrix to IndexedRowMatrix
 and make the corresponding changes although this would result in code
 duplication.

 I have fixed the problem by what I mentioned above. Now, multiply,
 computeSVD, and tallSkinnyQR are giving the correct results for
 indexedRowMatrix when using multiple executors or workers. Let me know
 if I should do a pull request for this.

 Best,
 Huamin

 On Fri, Dec 2, 2016 at 11:23 AM, Iman Mohtashemi <
 iman.mohtash...@gmail.com> wrote:

 Ok thanks.

 On Fri, Dec 2, 2016 at 8:19 AM Sean Owen  wrote:

 I tried, but enforcing the ordering changed a fair bit of behavior and
 I gave up. I think the way to think of it is: a RowMatrix has whatever
 ordering you made it with, so you need to give it ordered rows if you're
 going to use a method like the QR decomposition. That works. I don't think
 the QR method should ever have been on this class though, for this reason.

 On Fri, Dec 2, 2016 at 4:13 PM Iman Mohtashemi <
 iman.mohtash...@gmail.com> wrote:

 Hi guys,
 Was this bug ever resolved?
 Iman

 On Fri, Nov 11, 2016 at 9:59 AM Iman Mohtashemi <
 iman.mohtash...@gmail.com> wrote:

 Yes this would be helpful, otherwise the Q part of the decomposition is
 useless. One can use that to solve the system by transposing it and
 multiplying with b and solving for x  (Ax = b) where A = R and b = Qt*b
 since the Upper triangular matrix is correctly available (R)

 On Fri, Nov 11, 2016 at 3:56 AM Sean Owen  wrote:

 @Xiangrui / @Joseph, do you think it would be reasonable to have
 CoordinateMatrix sort the rows it creates to make an IndexedRowMatrix? in
 order to make the ultimate output of toRowMatrix less surprising when it's
 not ordered?


 On Tue, Nov 8, 2016 at 3:29 PM Sean Owen  wrote:

 I think the problem here is that IndexedRowMatrix.toRowMatrix does
 *not* result in a RowMatrix with rows in order of their indices,
 necessarily:


 // Drop its row indices.
 RowMatrix rowMat = indexedRowMatrix.toRowMatrix();

 What you get is a matrix where the rows are arranged in whatever order
 they were passed to IndexedRowMatrix. RowMatrix says it's for rows where
 the ordering doesn't matter, but then it's maybe surprising it has a QR
 decomposition method, because clearl

Re: Best way to process lookup ETL with Dataframes

2016-12-30 Thread Sesterhenn, Mike
Thanks, but is nvl() in Spark 1.5?  I can't find it in spark.sql.functions 
(http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$)


Reading about the Oracle nvl function, it seems it is similar to the na 
functions.  Not sure it will help though, because what I need is to join after 
the first join fails.


From: ayan guha 
Sent: Thursday, December 29, 2016 11:06 PM
To: Sesterhenn, Mike
Cc: user@spark.apache.org
Subject: Re: Best way to process lookup ETL with Dataframes

How about this -

select a.*, nvl(b.col,nvl(c.col,'some default'))
from driving_table a
left outer join lookup1 b on a.id=b.id
left outer join lookup2 c on a.id=c,id

?

On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Hi all,


I'm writing an ETL process with Spark 1.5, and I was wondering the best way to 
do something.


A lot of the fields I am processing require an algorithm similar to this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

Lookup into some other table to join some other fields.

}


With Dataframes, it seems the only way to do this is to do something like this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

   *SPLIT the dataframe into two DFs via DataFrame.filter(),

  one group with successful lookup, the other failed).*

   For failed lookup:  {

   Lookup into some other table to grab some other fields.

   }

   *MERGE the dataframe splits back together via DataFrame.unionAll().*

}


I'm seeing some really large execution plans as you might imagine in the Spark 
Ui, and the processing time seems way out of proportion with the size of the 
dataset.  (~250GB in 9 hours).


Is this the best approach to implement an algorithm like this?  Note also that 
some fields I am implementing require multiple staged split/merge steps due to 
cascading lookup joins.


Thanks,


Michael Sesterhenn

msesterh...@cars.com




--
Best Regards,
Ayan Guha


Broadcast Join and Inner Join giving different result on same DataFrame

2016-12-30 Thread titli batali
Hi,

I have two dataframes which has common column Product_Id on which i have to
perform a join operation.

val transactionDF = readCSVToDataFrame(sqlCtx: SQLContext,
pathToReadTransactions: String, transactionSchema: StructType)
val productDF = readCSVToDataFrame(sqlCtx: SQLContext,
pathToReadProduct:String, productSchema: StructType)

As, transaction data is very large but product data is small, i would
ideally do a  broadcast join where i braodcast productDF.

 val productBroadcastDF =  broadcast(productDF)
 val broadcastJoin = transcationDF.join(productBroadcastDF, "productId")

Or simply,  val innerJoin = transcationDF.join(productDF, "productId")
should give the same result as above.

But If i join using simple inner join i get  dataframe  with joined values
whereas if i do broadcast join i get empty dataframe with empty values. I
am not able to explain this behavior. Ideally both should give the same
result.

What could have gone wrong. Any one faced the similar issue?


Thanks,
Prateek


Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread Palash Gupta
Hi Marco & Ayan,
I have now clearer idea about what Marco means by Reduce. I will do it to dig 
down.
Let me answer to your queries:
hen you see the broadcast errors, does your job terminate? Palash>> Yes it 
terminated the app.
Or are you assuming that something is wrong just because you see the message in 
the logs?
Palash>> No it terminated for the very first step of Spark processing (in my 
case loading csv from hdfs)
Plus...Wrt logicWho writes the CSV? With what frequency?Palash>> We parsed 
xml files using python (not in spark scope) & make csv and put in hdfs
Does it app run all the time loading CSV from hadoop?
Palash>> Every hour two separate pyspark app are running1. Loading current 
expected hour data, prepare kpi, do aggregation, load in mongodb2. Same 
operation will run for delayed hour data

Are you using spark streaming?Palash>> No
Does it app run fine with an older version of spark (1.6 )Palash>> I didn't 
test with Spark 1.6. My app is running now good as I stopped second app 
(delayed data loading) since last two days. Even most of the case both are 
running well except few times...

Sent from Yahoo Mail on Android 
 
  On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni wrote:   
Correct. I mean reduce the functionality.Uhm I realised I didn't ask u a 
fundamental question. When you see the broadcast errors, does your job 
terminate? Or are you assuming that something is wrong just because you see the 
message in the logs?Plus...Wrt logicWho writes the CSV? With what 
frequency?Does it app run all the time loading CSV from hadoop?Are you using 
spark streaming?Does it app run fine with an older version of spark (1.6 )Hth
On 30 Dec 2016 12:44 pm, "ayan guha"  wrote:

@Palash: I think what Macro meant by "reduce functionality" is to reduce scope 
of your application's functionality so that you can isolate the issue in 
certain part(s) of the app...I do not think he meant "reduce" operation :)
On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta  wrote:

Hi Marco,
All of your suggestions are highly appreciated, whatever you said so far. I 
would apply to implement in my code and let you know. 

Let me answer your query:
What does your program do? 
Palash>> In each hour I am loading many CSV files and then I'm making some 
KPI(s) out of them. Finally I am doing some aggregation and inserting into 
mongodb from spark. 

 you say it runs for 2-3 hours, what is the logic? just processing a huge 
amount of data? doing ML ?Palash>> Yes you are right whatever I'm processing it 
should not take much time. Initially my processing was taking only 5 minutes as 
I was using all cores running only one application. When I created more 
separate spark applications for handling delayed data loading and implementing 
more use cases with parallel run, I started facing the error randomly. And due 
to separate resource distribution among four parallel spark application to run 
in parallel now some task is taking longer time than usual. But still it should 
not take 2-3 hours time...

Currently whole applications are running in a development environment where we 
have only two VM cluster and I will migrate to production platform by next 
week. I will let you know if there is any improvement over there. 

 I'd say break down your application..  reduce functionality , run and see 
outcome. then add more functionality, run and see again.
Palash>> Macro as I'm not very good in Spark. It would be helpful for me if you 
provide some example of reduce functionality. Cause I'm using Spark data frame, 
join data frames, use SQL statement to manipulate KPI(s). Here How could I 
apply reduce functionality?


 Thanks & Best Regards,
Palash Gupta


  From: Marco Mistroni 
 To: "spline_pal...@yahoo.com"  
Cc: User 
 Sent: Thursday, December 29, 2016 11:28 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed 
to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
   
Hello
 no sorry i dont have any further insight into that i have seen similar 
errors but for completely different issues, and in most of hte cases it had to 
do with my data or my processing rather than Spark itself.
What does your program do? you say it runs for 2-3 hours, what is the logic? 
just processing a huge amount of data?
doing ML ?
i'd say break down your application..  reduce functionality , run and see 
outcome. then add more functionality, run and see again.
I found myself doing htese kinds of things when i got errors in my spark apps.

To get a concrete help you will have to trim down the code to a few lines that 
can reproduces the error  That will be a great start

Sorry for not being of much help

hth
 marco





On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta  wrote:

Hi Marco,
Thanks for your response.
Yes I tested it before & am able to load from linux filesystem and it also 
sometimes have similar issue.
However in both cases (either from hadoop or linux file system), this error 
comes in some specific s

Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread Marco Mistroni
Correct. I mean reduce the functionality.
Uhm I realised I didn't ask u a fundamental question. When you see the
broadcast errors, does your job terminate? Or are you assuming that
something is wrong just because you see the message in the logs?
Plus...Wrt logicWho writes the CSV? With what frequency?
Does it app run all the time loading CSV from hadoop?
Are you using spark streaming?
Does it app run fine with an older version of spark (1.6 )
Hth

On 30 Dec 2016 12:44 pm, "ayan guha"  wrote:

> @Palash: I think what Macro meant by "reduce functionality" is to reduce
> scope of your application's functionality so that you can isolate the issue
> in certain part(s) of the app...I do not think he meant "reduce" operation
> :)
>
> On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta  invalid> wrote:
>
>> Hi Marco,
>>
>> All of your suggestions are highly appreciated, whatever you said so far.
>> I would apply to implement in my code and let you know.
>>
>> Let me answer your query:
>>
>> What does your program do?
>> Palash>> In each hour I am loading many CSV files and then I'm making
>> some KPI(s) out of them. Finally I am doing some aggregation and inserting
>> into mongodb from spark.
>>
>> you say it runs for 2-3 hours, what is the logic? just processing a huge
>> amount of data? doing ML ?
>> Palash>> Yes you are right whatever I'm processing it should not take
>> much time. Initially my processing was taking only 5 minutes as I was using
>> all cores running only one application. When I created more separate spark
>> applications for handling delayed data loading and implementing more use
>> cases with parallel run, I started facing the error randomly. And due to
>> separate resource distribution among four parallel spark application to run
>> in parallel now some task is taking longer time than usual. But still it
>> should not take 2-3 hours time...
>>
>> Currently whole applications are running in a development environment
>> where we have only two VM cluster and I will migrate to production platform
>> by next week. I will let you know if there is any improvement over there.
>>
>> I'd say break down your application..  reduce functionality , run and see
>> outcome. then add more functionality, run and see again.
>>
>> Palash>> Macro as I'm not very good in Spark. It would be helpful for me
>> if you provide some example of reduce functionality. Cause I'm using Spark
>> data frame, join data frames, use SQL statement to manipulate KPI(s). Here
>> How could I apply reduce functionality?
>>
>>
>>
>> Thanks & Best Regards,
>> Palash Gupta
>>
>>
>> --
>> *From:* Marco Mistroni 
>> *To:* "spline_pal...@yahoo.com" 
>> *Cc:* User 
>> *Sent:* Thursday, December 29, 2016 11:28 PM
>>
>> *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying
>> "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
>>
>> Hello
>>  no sorry i dont have any further insight into that i have seen
>> similar errors but for completely different issues, and in most of hte
>> cases it had to do with my data or my processing rather than Spark itself.
>> What does your program do? you say it runs for 2-3 hours, what is the
>> logic? just processing a huge amount of data?
>> doing ML ?
>> i'd say break down your application..  reduce functionality , run and see
>> outcome. then add more functionality, run and see again.
>> I found myself doing htese kinds of things when i got errors in my spark
>> apps.
>>
>> To get a concrete help you will have to trim down the code to a few lines
>> that can reproduces the error  That will be a great start
>>
>> Sorry for not being of much help
>>
>> hth
>>  marco
>>
>>
>>
>>
>>
>> On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta 
>> wrote:
>>
>> Hi Marco,
>>
>> Thanks for your response.
>>
>> Yes I tested it before & am able to load from linux filesystem and it
>> also sometimes have similar issue.
>>
>> However in both cases (either from hadoop or linux file system), this
>> error comes in some specific scenario as per my observations:
>>
>> 1. When two parallel spark separate application is initiated from one
>> driver (not all the time, sometime)
>> 2. If one spark jobs are running for more than expected hour let say 2-3
>> hours, the second application terminated giving the error.
>>
>> To debug the problem for me it will be good if you can share some
>> possible reasons why failed to broadcast error may come.
>>
>> Or if you need more logs I can share.
>>
>> Thanks again Spark User Group.
>>
>> Best Regards
>> Palash Gupta
>>
>>
>>
>> Sent from Yahoo Mail on Android
>> 
>>
>> On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni
>>  wrote:
>> Hi
>>  Pls try to read a CSV from filesystem instead of hadoop. If you can read
>> it successfully then your hadoop file is the issue and you can start
>> debugging from there.
>> Hth
>>
>> On 29 Dec 2016 6:26 am, "Palash Gupta" > invalid> wrote:
>>
>> Hi A

Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread ayan guha
@Palash: I think what Macro meant by "reduce functionality" is to reduce
scope of your application's functionality so that you can isolate the issue
in certain part(s) of the app...I do not think he meant "reduce" operation
:)

On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta <
spline_pal...@yahoo.com.invalid> wrote:

> Hi Marco,
>
> All of your suggestions are highly appreciated, whatever you said so far.
> I would apply to implement in my code and let you know.
>
> Let me answer your query:
>
> What does your program do?
> Palash>> In each hour I am loading many CSV files and then I'm making some
> KPI(s) out of them. Finally I am doing some aggregation and inserting into
> mongodb from spark.
>
> you say it runs for 2-3 hours, what is the logic? just processing a huge
> amount of data? doing ML ?
> Palash>> Yes you are right whatever I'm processing it should not take much
> time. Initially my processing was taking only 5 minutes as I was using all
> cores running only one application. When I created more separate spark
> applications for handling delayed data loading and implementing more use
> cases with parallel run, I started facing the error randomly. And due to
> separate resource distribution among four parallel spark application to run
> in parallel now some task is taking longer time than usual. But still it
> should not take 2-3 hours time...
>
> Currently whole applications are running in a development environment
> where we have only two VM cluster and I will migrate to production platform
> by next week. I will let you know if there is any improvement over there.
>
> I'd say break down your application..  reduce functionality , run and see
> outcome. then add more functionality, run and see again.
>
> Palash>> Macro as I'm not very good in Spark. It would be helpful for me
> if you provide some example of reduce functionality. Cause I'm using Spark
> data frame, join data frames, use SQL statement to manipulate KPI(s). Here
> How could I apply reduce functionality?
>
>
>
> Thanks & Best Regards,
> Palash Gupta
>
>
> --
> *From:* Marco Mistroni 
> *To:* "spline_pal...@yahoo.com" 
> *Cc:* User 
> *Sent:* Thursday, December 29, 2016 11:28 PM
>
> *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying
> "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
>
> Hello
>  no sorry i dont have any further insight into that i have seen
> similar errors but for completely different issues, and in most of hte
> cases it had to do with my data or my processing rather than Spark itself.
> What does your program do? you say it runs for 2-3 hours, what is the
> logic? just processing a huge amount of data?
> doing ML ?
> i'd say break down your application..  reduce functionality , run and see
> outcome. then add more functionality, run and see again.
> I found myself doing htese kinds of things when i got errors in my spark
> apps.
>
> To get a concrete help you will have to trim down the code to a few lines
> that can reproduces the error  That will be a great start
>
> Sorry for not being of much help
>
> hth
>  marco
>
>
>
>
>
> On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta 
> wrote:
>
> Hi Marco,
>
> Thanks for your response.
>
> Yes I tested it before & am able to load from linux filesystem and it also
> sometimes have similar issue.
>
> However in both cases (either from hadoop or linux file system), this
> error comes in some specific scenario as per my observations:
>
> 1. When two parallel spark separate application is initiated from one
> driver (not all the time, sometime)
> 2. If one spark jobs are running for more than expected hour let say 2-3
> hours, the second application terminated giving the error.
>
> To debug the problem for me it will be good if you can share some possible
> reasons why failed to broadcast error may come.
>
> Or if you need more logs I can share.
>
> Thanks again Spark User Group.
>
> Best Regards
> Palash Gupta
>
>
>
> Sent from Yahoo Mail on Android
> 
>
> On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni
>  wrote:
> Hi
>  Pls try to read a CSV from filesystem instead of hadoop. If you can read
> it successfully then your hadoop file is the issue and you can start
> debugging from there.
> Hth
>
> On 29 Dec 2016 6:26 am, "Palash Gupta" 
> wrote:
>
> Hi Apache Spark User team,
>
>
>
> Greetings!
>
> I started developing an application using Apache Hadoop and Spark using
> python. My pyspark application randomly terminated saying "Failed to get
> broadcast_1*" and I have been searching for suggestion and support in
> Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in
> pyspark application
> 
>
>
> Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
> I was building an application on Apache Spark 2.00 with Pyth

Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread Palash Gupta
Hi Marco,
All of your suggestions are highly appreciated, whatever you said so far. I 
would apply to implement in my code and let you know. 

Let me answer your query:
What does your program do? 
Palash>> In each hour I am loading many CSV files and then I'm making some 
KPI(s) out of them. Finally I am doing some aggregation and inserting into 
mongodb from spark. 

 you say it runs for 2-3 hours, what is the logic? just processing a huge 
amount of data? doing ML ?Palash>> Yes you are right whatever I'm processing it 
should not take much time. Initially my processing was taking only 5 minutes as 
I was using all cores running only one application. When I created more 
separate spark applications for handling delayed data loading and implementing 
more use cases with parallel run, I started facing the error randomly. And due 
to separate resource distribution among four parallel spark application to run 
in parallel now some task is taking longer time than usual. But still it should 
not take 2-3 hours time...

Currently whole applications are running in a development environment where we 
have only two VM cluster and I will migrate to production platform by next 
week. I will let you know if there is any improvement over there. 

 I'd say break down your application..  reduce functionality , run and see 
outcome. then add more functionality, run and see again.
Palash>> Macro as I'm not very good in Spark. It would be helpful for me if you 
provide some example of reduce functionality. Cause I'm using Spark data frame, 
join data frames, use SQL statement to manipulate KPI(s). Here How could I 
apply reduce functionality?


 Thanks & Best Regards,
Palash Gupta


  From: Marco Mistroni 
 To: "spline_pal...@yahoo.com"  
Cc: User 
 Sent: Thursday, December 29, 2016 11:28 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed 
to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
   
Hello
 no sorry i dont have any further insight into that i have seen similar 
errors but for completely different issues, and in most of hte cases it had to 
do with my data or my processing rather than Spark itself.
What does your program do? you say it runs for 2-3 hours, what is the logic? 
just processing a huge amount of data?
doing ML ?
i'd say break down your application..  reduce functionality , run and see 
outcome. then add more functionality, run and see again.
I found myself doing htese kinds of things when i got errors in my spark apps.

To get a concrete help you will have to trim down the code to a few lines that 
can reproduces the error  That will be a great start

Sorry for not being of much help

hth
 marco





On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta  wrote:

Hi Marco,
Thanks for your response.
Yes I tested it before & am able to load from linux filesystem and it also 
sometimes have similar issue.
However in both cases (either from hadoop or linux file system), this error 
comes in some specific scenario as per my observations:
1. When two parallel spark separate application is initiated from one driver 
(not all the time, sometime)2. If one spark jobs are running for more than 
expected hour let say 2-3 hours, the second application terminated giving the 
error.
To debug the problem for me it will be good if you can share some possible 
reasons why failed to broadcast error may come.
Or if you need more logs I can share.
Thanks again Spark User Group.
Best RegardsPalash Gupta


Sent from Yahoo Mail on Android 
 
 On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni wrote:  
Hi Pls try to read a CSV from filesystem instead of hadoop. If you can read it 
successfully then your hadoop file is the issue and you can start debugging 
from there.Hth
On 29 Dec 2016 6:26 am, "Palash Gupta"  wrote:

Hi Apache Spark User team,


Greetings!
I started developing an application using Apache Hadoop and Spark using python. 
My pyspark application randomly terminated saying "Failed to get broadcast_1*" 
and I have been searching for suggestion and support in Stakeoverflow at Failed 
to get broadcast_1_piece0 of broadcast_1 in pyspark application

  
|  
|  
|  
|   ||

  |

  |
|  
|   |  
Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
 I was building an application on Apache Spark 2.00 with Python 3.4 and trying 
to load some CSV files from HDFS (...  |   |

  |

  |

 

Could you please provide suggestion registering myself in Apache User list or 
how can I get suggestion or support to debug the problem I am facing?

Your response will be highly appreciated. 


 Thanks & Best Regards,
Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494


   
  




   

Re: Spark Partitioning Strategy with Parquet

2016-12-30 Thread titli batali
Yeah, it works for me.

Thanks

On Fri, Nov 18, 2016 at 3:08 AM, ayan guha  wrote:

> Hi
>
> I think you can use map reduce paradigm here. Create a key  using user ID
> and date and record as a value. Then you can express your operation (do
> something) part as a function. If the function meets certain criteria such
> as associative and cumulative like, say Add or multiplication, you can use
> reducebykey, else you may use groupbykey.
>
> HTH
> On 18 Nov 2016 06:45, "titli batali"  wrote:
>
>>
>> That would help but again in a particular partitions i would need to a
>> iterate over the customers having first n letters of user id in that
>> partition. I want to get rid of nested iterations.
>>
>> Thanks
>>
>> On Thu, Nov 17, 2016 at 10:28 PM, Xiaomeng Wan 
>> wrote:
>>
>>> You can partitioned on the first n letters of userid
>>>
>>> On 17 November 2016 at 08:25, titli batali 
>>> wrote:
>>>
 Hi,

 I have a use case, where we have 1000 csv files with a column user_Id,
 having 8 million unique users. The data contains: userid,date,transaction,
 where we run some queries.

 We have a case where we need to iterate for each transaction in a
 particular date for each user. There is three nesting loops

 for(user){
   for(date){
 for(transactions){
   //Do Something
   }
}
 }

 i.e we do similar thing for every (date,transaction) tuple for a
 particular user. In order to get away with loop structure and decrease the
 processing time We are converting converting the csv files to parquet and
 partioning it with userid, df.write.format("parquet").par
 titionBy("useridcol").save("hdfs://path").

 So that while reading the parquet files, we read a particular user in a
 particular partition and create a Cartesian product of (date X transaction)
 and work on the tuple in each partition, to achieve the above level of
 nesting. Partitioning on 8 million users is it a bad option. What could be
 a better way to achieve this?

 Thanks



>>>
>>>
>>


Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread Palash Gupta
Hi Nicholas,
Appreciated your response.

Understand your articulated point & I will implement and let you know the 
status of the problem.
Sample:
// these lines are equivalent in Spark 2.0
spark.read.format("csv").option("header", "true").load("../Downloads/*.csv")
spark.read.option("header", "true").csv("../Downloads/*.csv") Thanks & Best 
Regards,
Palash Gupta


  From: Nicholas Hakobian 
 To: "spline_pal...@yahoo.com"  
Cc: Marco Mistroni ; User 
 Sent: Thursday, December 29, 2016 10:39 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed 
to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
   
If you are using spark 2.0 (as listed in the stackoverflow post) why are you 
using the external CSV module from Databricks? Spark 2.0 includes the 
functionality from this external module natively, and its possible you are 
mixing an older library with a newer spark which could explain a crash.

Nicholas Szandor Hakobian, Ph.D.Senior Data ScientistRally 
healthnicholas.hakob...@rallyhealth.com


On Thu, Dec 29, 2016 at 4:00 AM, Palash Gupta  
wrote:

Hi Marco,
Thanks for your response.
Yes I tested it before & am able to load from linux filesystem and it also 
sometimes have similar issue.
However in both cases (either from hadoop or linux file system), this error 
comes in some specific scenario as per my observations:
1. When two parallel spark separate application is initiated from one driver 
(not all the time, sometime)2. If one spark jobs are running for more than 
expected hour let say 2-3 hours, the second application terminated giving the 
error.
To debug the problem for me it will be good if you can share some possible 
reasons why failed to broadcast error may come.
Or if you need more logs I can share.
Thanks again Spark User Group.
Best RegardsPalash Gupta


Sent from Yahoo Mail on Android 
 
 On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni wrote:  
Hi Pls try to read a CSV from filesystem instead of hadoop. If you can read it 
successfully then your hadoop file is the issue and you can start debugging 
from there.Hth
On 29 Dec 2016 6:26 am, "Palash Gupta"  wrote:

Hi Apache Spark User team,


Greetings!
I started developing an application using Apache Hadoop and Spark using python. 
My pyspark application randomly terminated saying "Failed to get broadcast_1*" 
and I have been searching for suggestion and support in Stakeoverflow at Failed 
to get broadcast_1_piece0 of broadcast_1 in pyspark application

  
|  
|  
|  
|   ||

  |

  |
|  
|   |  
Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
 I was building an application on Apache Spark 2.00 with Python 3.4 and trying 
to load some CSV files from HDFS (...  |   |

  |

  |

 

Could you please provide suggestion registering myself in Apache User list or 
how can I get suggestion or support to debug the problem I am facing?

Your response will be highly appreciated. 


 Thanks & Best Regards,
Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494