Re: Should it be safe to embed Spark in Local Mode?

2016-07-19 Thread Holden Karau
That's interesting and might be better suited to the dev list. I know in
some cases System exit off -1 were added so the task would be marked as
failure.

On Tuesday, July 19, 2016, Brett Randall  wrote:

> This question is regarding
> https://issues.apache.org/jira/browse/SPARK-15685 (StackOverflowError
> (VirtualMachineError) or NoClassDefFoundError (LinkageError) should not
> System.exit() in local mode) and hopes to draw attention-to and
> discussion-on that issue.
>
> I have a product that is hosted as a microservice, running in a
> web-container e.g. Jetty, as a long-running service, publishing a REST
> API.  For small-computations, to reduce latency, I wish to run Spark in
> local mode.  For larger jobs the service might launch a remote job on a
> cluster e.g. Spark-on-YARN.  Either way, there may be custom modules
> deployed to the service from time-to-time, involving third-part libraries
> etc.
>
> My concern is as outlined in SPARK-15685.  If I have a third-party
> library, and either direct or transient dependencies are not satisfied,
> when the code is deployed and run I might suffer a NoClassDefFoundError.
> Or there may be some broken logic leading to a StackOverflowError
> (VirtualMachineError).  Normally if this occurred in a plan
> microservice/web-application, the thread handling the request would see the
> unchecked Throwable/Error and fail, but otherwise the service continues.
>
> With Spark in local mode, due to the quite-specific categorization and
> handling of the aforementioned specific Throwable/Error types (ref 
> Utils.isFatalError
> and other Scala definitions), the result when they are thrown is that Spark
> deems that the JVM should be forcibly shutdown via System.exit(), thereby
> killing the microservice.
>
> Is it reasonable that in the face of the above Errors occuring, we should
> ask that Spark does not exit the JVM, instead allowing some exception or
> error to be thrown? The System.exit() approach seems aligned with the idea
> of a command-line job batch and a quick-exit of the entire JVM and any
> running threads, but it is poorly suited to running in local mode in a
> microservice.
>
> Thoughts?
>
> Thanks,
> Brett
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-19 Thread Deepak Sharma
I am using DAO in spark application to write the final computation to
Cassandra  and it performs well.
What kinds of issues you foresee using DAO for hbase ?

Thanks
Deepak

On 19 Jul 2016 10:04 pm, "Yu Wei"  wrote:

> Hi guys,
>
>
> I write spark application and want to store results generated by spark
> application to hbase.
>
> Do I need to access hbase via java api directly?
>
> Or is it better choice to use DAO similar as traditional RDBMS?  I suspect
> that there is major performance downgrade and other negative impacts using
> DAO. However, I have little knowledge in this field.
>
>
> Any advice?
>
>
> Thanks,
>
> Jared
>
>
>
>


Extremely slow shuffle writes and large job time fluxuations

2016-07-19 Thread Jon Chase
I'm running into an issue with a pyspark job where I'm sometimes seeing
extremely variable job times (20min to 2hr) and very long shuffle times
(e.g. ~2 minutes for 18KB/86 records).

Cluster set up is Amazon EMR 4.4.0, Spark 1.6.0, an m4.2xl driver and a
single m4.10xlarge (40 vCPU, 160GB) executor with the following params:

--conf spark.driver.memory=10G --conf spark.default.parallelism=160 --conf
spark.driver.maxResultSize=4g --num-executors 2 --executor-cores 20
--executor-memory 67G

What's odd is that sometimes the job will run in 20 minutes and sometimes
it will take 2 hours - both jobs with the same data.  I'm using RDDs (not
DataFrames).  There's plenty of RAM, I've looked at the GC logs (using CMS)
and they look fine.  The job reads some data from files, does some
maps/filters/joins/etc; nothing too special.

The only thing I've noticed that looks odd is that the slow instances of
the job have unusually long Shuffle Write times for some tasks.  For
example, a .join operation has ~30 tasks out of 320 that take 2.5 minutes,
GC time of 0.1 seconds, Shuffle Read Size / Records of 12KB/30, and, most
interestingly, Write Time of 2.5 minutes for Shuffle Write Size / Records
of 18KB/86 records.  When looking at the event time line for the stage it's
almost all yellow (Shuffle Write).

We've been running this job on a difference EMR cluster topology (12
m3.2xlarge's) and have not seen the slow down described above.  We've only
observed it on the m4.10xl machine.

It might be worth mentioning again that this is pyspark and no DataFrames
(just RDDs).  When I run 'top' I sometimes see lots (e.g. 60 or 70) python
processes on the executor (I assume one per partition being processed?).

It seems like this has something to do with the single m4.10xl set up, as
we haven't seen this behavior on the 12 m3.2xl cluster.

What I really don't understand is why the job seems to run fine (20
minutes) for a while, and then (for the same data) takes so much longer (2
hours), and with such long shuffle write times.


Re: Storm HDFS bolt equivalent in Spark Streaming.

2016-07-19 Thread Deepak Sharma
In spark streaming , you have to decide the duration of micro batches to
run.
Once you get the micro batch , transform it as per your logic and then you
can use saveAsTextFiles on your final RDD to write it to HDFS.

Thanks
Deepak

On 20 Jul 2016 9:49 am,  wrote:

*Dell - Internal Use - Confidential *

*Dell - Internal Use - Confidential *

While writing to Kafka from Storm, the hdfs bolt provides a nice way to
batch the messages , rotate files, file name convention etc as shown below.



Do you know of something similar in Spark Streaming or do we have to roll
our own? If anyone attempted this can you throw some pointers.



Every other streaming solution like Flume and NIFI handle logic like below.



https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.6/bk_storm-user-guide/content/writing-data-with-storm-hdfs-connector.html



// use "|" instead of "," for field delimiter

RecordFormat format = new DelimitedRecordFormat()

.withFieldDelimiter("|");



// Synchronize the filesystem after every 1000 tuples

SyncPolicy syncPolicy = new CountSyncPolicy(1000);



// Rotate data files when they reach 5 MB

FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f,
Units.MB);



// Use default, Storm-generated file names

FileNameFormat fileNameFormat = new DefaultFileNameFormat()

.withPath("/foo/");





// Instantiate the HdfsBolt

HdfsBolt bolt = new HdfsBolt()

.withFsUrl("hdfs://localhost:8020")

.withFileNameFormat(fileNameFormat)

.withRecordFormat(format)

.withRotationPolicy(rotationPolicy)

.withSyncPolicy(syncPolicy);


Storm HDFS bolt equivalent in Spark Streaming.

2016-07-19 Thread Rajesh_Kalluri
Dell - Internal Use - Confidential

Dell - Internal Use - Confidential
While writing to Kafka from Storm, the hdfs bolt provides a nice way to batch 
the messages , rotate files, file name convention etc as shown below.

Do you know of something similar in Spark Streaming or do we have to roll our 
own? If anyone attempted this can you throw some pointers.

Every other streaming solution like Flume and NIFI handle logic like below.

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.6/bk_storm-user-guide/content/writing-data-with-storm-hdfs-connector.html

// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");

// Synchronize the filesystem after every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// Rotate data files when they reach 5 MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");


// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);




Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-19 Thread Ted Yu
hbase-spark module is in the up-coming hbase 2.0 release.
Currently it is in master branch of hbase git repo.

FYI

On Tue, Jul 19, 2016 at 8:27 PM, Andrew Ehrlich  wrote:

> There is a Spark<->HBase library that does this.  I used it once in a
> prototype (never tried in production through):
> http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/
>
> On Jul 19, 2016, at 9:34 AM, Yu Wei  wrote:
>
> Hi guys,
>
> I write spark application and want to store results generated by spark
> application to hbase.
> Do I need to access hbase via java api directly?
> Or is it better choice to use DAO similar as traditional RDBMS?  I suspect
> that there is major performance downgrade and other negative impacts using
> DAO. However, I have little knowledge in this field.
>
> Any advice?
>
> Thanks,
> Jared
>
>
>


Re: Spark Job trigger in production

2016-07-19 Thread Andrew Ehrlich
Another option is Oozie with the spark action: 
https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html 
 

> On Jul 18, 2016, at 12:15 AM, Jagat Singh  wrote:
> 
> You can use following options
> 
> * spark-submit from shell 
> * some kind of job server. See spark-jobserver for details
> * some notebook environment See Zeppelin for example
> 
> 
> 
> 
> 
> On 18 July 2016 at 17:13, manish jaiswal  > wrote:
> Hi,
> 
> 
> What is the best approach to trigger spark job in production cluster?
> 



Re: the spark job is so slow - almost frozen

2016-07-19 Thread Andrew Ehrlich
Try:

- filtering down the data as soon as possible in the job, dropping columns you 
don’t need.
- processing fewer partitions of the hive tables at a time
- caching frequently accessed data, for example dimension tables, lookup 
tables, or other datasets that are repeatedly accessed
- using the Spark UI to identify the bottlenecked resource
- remove features or columns from the output data, until it runs, then add them 
back in one at a time.
- creating a static dataset small enough to work, and editing the query, then 
retesting, repeatedly until you cut the execution time by a significant fraction
- Using the Spark UI or spark shell to check the skew and make sure partitions 
are evenly distributed

> On Jul 18, 2016, at 3:33 AM, Zhiliang Zhu  wrote:
> 
> Thanks a lot for your reply .
> 
> In effect , here we tried to run the sql on kettle, hive and spark hive (by 
> HiveContext) respectively, the job seems frozen  to finish to run .
> 
> In the 6 tables , need to respectively read the different columns in 
> different tables for specific information , then do some simple calculation 
> before output . 
> join operation is used most in the sql . 
> 
> Best wishes! 
> 
> 
> 
> 
> On Monday, July 18, 2016 6:24 PM, Chanh Le  wrote:
> 
> 
> Hi,
> What about the network (bandwidth) between hive and spark? 
> Does it run in Hive before then you move to Spark?
> Because It's complex you can use something like EXPLAIN command to show what 
> going on.
> 
> 
> 
> 
>  
>> On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu > > wrote:
>> 
>> the sql logic in the program is very much complex , so do not describe the 
>> detailed codes   here . 
>> 
>> 
>> On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu > > wrote:
>> 
>> 
>> Hi All,  
>> 
>> Here we have one application, it needs to extract different columns from 6 
>> hive tables, and then does some easy calculation, there is around 100,000 
>> number of rows in each table,
>> finally need to output another table or file (with format of consistent 
>> columns) .
>> 
>>  However, after lots of days trying, the spark hive job is unthinkably slow 
>> - sometimes almost frozen. There is 5 nodes for spark cluster. 
>>  
>> Could anyone offer some help, some idea or clue is also good. 
>> 
>> Thanks in advance~
>> 
>> Zhiliang 
>> 
>> 
> 
> 
> 



Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-19 Thread Andrew Ehrlich
There is a Spark<->HBase library that does this.  I used it once in a prototype 
(never tried in production through): 
http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/
 


> On Jul 19, 2016, at 9:34 AM, Yu Wei  wrote:
> 
> Hi guys,
> 
> I write spark application and want to store results generated by spark 
> application to hbase.
> Do I need to access hbase via java api directly? 
> Or is it better choice to use DAO similar as traditional RDBMS?  I suspect 
> that there is major performance downgrade and other negative impacts using 
> DAO. However, I have little knowledge in this field.
> 
> Any advice?
> 
> Thanks,
> Jared



Re: Saving a pyspark.ml.feature.PCA model

2016-07-19 Thread Ajinkya Kale
I am using google cloud dataproc which comes with spark 1.6.1. So upgrade
is not really an option.
No way / hack to save the models in spark 1.6.1 ?

On Tue, Jul 19, 2016 at 8:13 PM Shuai Lin  wrote:

> It's added in not-released-yet 2.0.0 version.
>
> https://issues.apache.org/jira/browse/SPARK-13036
> https://github.com/apache/spark/commit/83302c3b
>
> so i guess you need to wait for 2.0 release (or use the current rc4).
>
> On Wed, Jul 20, 2016 at 6:54 AM, Ajinkya Kale 
> wrote:
>
>> Is there a way to save a pyspark.ml.feature.PCA model ? I know mllib has
>> that but mllib does not have PCA afaik. How do people do model persistence
>> for inference using the pyspark ml models ? Did not find any documentation
>> on model persistency for ml.
>>
>> --ajinkya
>>
>
>


Re: Heavy Stage Concentration - Ends With Failure

2016-07-19 Thread Andrew Ehrlich
Yea this is a good suggestion; also check 25th percentile, median, and 75th 
percentile to see how skewed the input data is.

If you find that the RDD’s partitions are skewed you can solve it either by 
changing the partitioner when you read the files like already suggested, or 
call repartition() on the RDD before the bottleneck to redistribute the 
data amongst the partitions by executing a shuffle.

> On Jul 19, 2016, at 6:19 PM, Kuchekar  wrote:
> 
> Hi,
> 
> Can you check if the RDD is partitioned correctly with correct partition 
> number (if you are manually setting the partition value.) . Try using Hash 
> partitioner while reading the files.
> 
> One way you can debug is by checking the number of records that executor has 
> compared to others in the Stage tab of the Spark UI.
> 
> Kuchekar, Nilesh
> 
> On Tue, Jul 19, 2016 at 8:16 PM, Aaron Jackson  > wrote:
> Hi,
> 
> I have a cluster with 15 nodes of which 5 are HDFS nodes.  I kick off a job 
> that creates some 120 stages.  Eventually, the active and pending stages 
> reduce down to a small bottleneck and it never fails... the tasks associated 
> with the 10 (or so) running tasks are always allocated to the same executor 
> on the same host.
> 
> Sooner or later, it runs out of memory ... or some other resource.  It falls 
> over and then they tasks are reallocated to another executor.
> 
> Why do we see such heavy concentration of tasks onto a single executor when 
> other executors are free?  Were the tasks assigned to an executor when the 
> job was decomposed into stages?
> 



Re: Filtering RDD Using Spark.mllib's ChiSqSelector

2016-07-19 Thread Tobi Bosede
Thanks Yanbo, will try that!

On Sun, Jul 17, 2016 at 10:26 PM, Yanbo Liang  wrote:

> Hi Tobi,
>
> Thanks for clarifying the question. It's very straight forward to convert
> the filtered RDD to DataFrame, you can refer the following code snippets:
>
> from pyspark.sql import Row
>
> rdd2 = filteredRDD.map(lambda v: Row(features=v))
>
> df = rdd2.toDF()
>
>
> Thanks
> Yanbo
>
> 2016-07-16 14:51 GMT-07:00 Tobi Bosede :
>
>> Hi Yanbo,
>>
>> Appreciate the response. I might not have phrased this correctly, but I
>> really wanted to know how to convert the pipeline rdd into a data frame. I
>> have seen the example you posted. However I need to transform all my data,
>> just not 1 line. So I did sucessfully use map to use the chisq selector to
>> filter the chosen features of my data. I just want to convert it to a df so
>> I can apply a logistic regression model from spark.ml.
>>
>> Trust me I would use the dataframes api if I could, but the chisq
>> functionality is not available to me in the python spark 1.4 api.
>>
>> Regards,
>> Tobi
>>
>> On Jul 16, 2016 4:53 AM, "Yanbo Liang"  wrote:
>>
>>> Hi Tobi,
>>>
>>> The MLlib RDD-based API does support to apply transformation on both
>>> Vector and RDD, but you did not use the appropriate way to do.
>>> Suppose you have a RDD with LabeledPoint in each line, you can refer the
>>> following code snippets to train a ChiSqSelectorModel model and do
>>> transformation:
>>>
>>> from pyspark.mllib.regression import LabeledPoint
>>>
>>> from pyspark.mllib.feature import ChiSqSelector
>>>
>>> data = [LabeledPoint(0.0, SparseVector(3, {0: 8.0, 1: 7.0})), 
>>> LabeledPoint(1.0, SparseVector(3, {1: 9.0, 2: 6.0})), LabeledPoint(1.0, 
>>> [0.0, 9.0, 8.0]), LabeledPoint(2.0, [8.0, 9.0, 5.0])]
>>>
>>> rdd = sc.parallelize(data)
>>>
>>> model = ChiSqSelector(1).fit(rdd)
>>>
>>> filteredRDD = model.transform(rdd.map(lambda lp: lp.features))
>>>
>>> filteredRDD.collect()
>>>
>>> However, we strongly recommend you to migrate to DataFrame-based API
>>> since the RDD-based API is switched to maintain mode.
>>>
>>> Thanks
>>> Yanbo
>>>
>>> 2016-07-14 13:23 GMT-07:00 Tobi Bosede :
>>>
 Hi everyone,

 I am trying to filter my features based on the spark.mllib
 ChiSqSelector.

 filteredData = vectorizedTestPar.map(lambda lp: LabeledPoint(lp.label,
 model.transform(lp.features)))

 However when I do the following I get the error below. Is there any
 other way to filter my data to avoid this error?

 filteredDataDF=filteredData.toDF()

 Exception: It appears that you are attempting to reference SparkContext 
 from a broadcast variable, action, or transforamtion. SparkContext can 
 only be used on the driver, not in code that it run on workers. For more 
 information, see SPARK-5063.


 I would directly use the spark.ml ChiSqSelector and work with dataframes, 
 but I am on spark 1.4 and using pyspark. So spark.ml's ChiSqSelector is 
 not available to me. filteredData is of type piplelineRDD, if that helps. 
 It is not a regular RDD. I think that may part of why calling toDF() is 
 not working.


 Thanks,

 Tobi


>>>
>


Re: Saving a pyspark.ml.feature.PCA model

2016-07-19 Thread Shuai Lin
It's added in not-released-yet 2.0.0 version.

https://issues.apache.org/jira/browse/SPARK-13036
https://github.com/apache/spark/commit/83302c3b

so i guess you need to wait for 2.0 release (or use the current rc4).

On Wed, Jul 20, 2016 at 6:54 AM, Ajinkya Kale  wrote:

> Is there a way to save a pyspark.ml.feature.PCA model ? I know mllib has
> that but mllib does not have PCA afaik. How do people do model persistence
> for inference using the pyspark ml models ? Did not find any documentation
> on model persistency for ml.
>
> --ajinkya
>


Re: spark worker continuously trying to connect to master and failed in standalone mode

2016-07-19 Thread Andrew Ehrlich
Troubleshooting steps:

$ telnet localhost 7077 (on master, to confirm port is open)
$ telnet  7077 (on slave, to confirm port is blocked)

If the port is available on the master from the master, but not on the master 
from the slave, check firewall settings on the master: 
https://help.ubuntu.com/lts/serverguide/firewall.html 

> On Jul 19, 2016, at 6:25 PM, Neil Chang  wrote:
> 
> Hi,
>   I have two virtual pcs on private cloud (ubuntu 14). I installed spark 2.0 
> preview on both machines. I then tried to test it with standalone mode.
> I have no problem start the master. However, when I start the worker (slave) 
> on another machine, it makes many attempts to connect to master and failed at 
> the end. 
>   I can ssh from each machine to another without any problem. I can also run 
> a master and worker at the same machine without any problem.
> 
> What did I miss? Any clue?
> 
> here are the messages:
> 
> WARN NativeCodeLoader: Unable to load native-hadoop library for your platform 
> ... using builtin-java classes where applicable
> ..
> INFO Worker: Connecting to master ip:7077 ... 
> INFO Worker: Retrying connection to master (attempt #1)
> ..
> INFO Worker: Retrying connection to master (attempt #7)
> java.lang.IllegalArgumentException: requirement failed: TransportClient has 
> not yet been set.
>at scala.Predef$.require(Predef.scala:224)
> ...
> WARN NettyRocEnv: Ignored failure: java.io.IOException: Connecting to ip:7077 
> timed out
> WARN Worker: Failed to connect to master ip.7077
> 
> 
> 
> Thanks,
> Neil



Re: Dataframe Transformation with Inner fields in Complex Datatypes.

2016-07-19 Thread java bigdata
Hi Ayan,
Thanks for your update.

All i am trying is to update an inner field in one of the
dataframe's complex type column. withColumn method adds or replaces
existing column. In my case column is a nested column. Please see the below
example i mentioned in the mail.

I dont have to add a new column. One way of my thinking to solve this is to
create a new complex type column(structtype), same as the one available in
dataframe, and during the process update the nested field. At the end add
the newly created struct type column to the dataframe and drop old one.
Disadvantage:
1. However, this will require iterating through millions of rows leading to
perf impact.
2. If there is only one/few columns to be updated, it may not be right way
to create a new column and add to dataframe.

Any help will be greatly appreciated!
Thanks.

On Monday, July 18, 2016, ayan guha  wrote:

> Hi
>
> withColumn adds the column. If you want different name, please use
> .alias() function.
>
> On Mon, Jul 18, 2016 at 2:16 AM, java bigdata  > wrote:
>
>> Hi Team,
>>
>> I am facing a major issue while transforming dataframe containing complex
>> datatype columns. I need to update the inner fields of complex datatype,
>> for eg: converting one inner field to UPPERCASE letters, and return the
>> same dataframe with new upper case values in it. Below is my issue
>> description. Kindly suggest/guide me a way forward.
>>
>> *My suggestion: *can we have a new version of 
>> *dataframe.withcolumn(,
>> udf($innerfieldreference), )*,
>> so that when this method gets executed, i get same dataframe with
>> transformed values.
>>
>>
>> *Issue Description:*
>> Using dataframe.withColumn(,udf($colname)) for inner fields in
>> struct/complex datatype, results in a new dataframe with the a new column
>> appended to it. "colname" in the above argument is given as fullname with
>> dot notation to access the struct/complex fields.
>>
>> For eg: hive table has columns: (id int, address struct> buildname:string, stname:string>>, line2:string>)
>>
>> I need to update the inner field 'buildname'. I can select the inner
>> field through dataframe as : df.select($"address.line1.buildname"), however
>> when I use df.withColumn("address.line1.buildname",
>> toUpperCaseUDF($"address.line1.buildname")), it is resulting in a new
>> dataframe with new column: "address.line1.buildname" appended, with
>> toUpperCaseUDF values from inner field buildname.
>>
>> How can I update the inner fields of the complex data types. Kindly
>> suggest.
>>
>> Thanks in anticipation.
>>
>> Best Regards,
>> Naveen Kumar.
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Should it be safe to embed Spark in Local Mode?

2016-07-19 Thread Brett Randall
This question is regarding
https://issues.apache.org/jira/browse/SPARK-15685 (StackOverflowError
(VirtualMachineError) or NoClassDefFoundError (LinkageError) should not
System.exit() in local mode) and hopes to draw attention-to and
discussion-on that issue.

I have a product that is hosted as a microservice, running in a
web-container e.g. Jetty, as a long-running service, publishing a REST
API.  For small-computations, to reduce latency, I wish to run Spark in
local mode.  For larger jobs the service might launch a remote job on a
cluster e.g. Spark-on-YARN.  Either way, there may be custom modules
deployed to the service from time-to-time, involving third-part libraries
etc.

My concern is as outlined in SPARK-15685.  If I have a third-party library,
and either direct or transient dependencies are not satisfied, when the
code is deployed and run I might suffer a NoClassDefFoundError.  Or there
may be some broken logic leading to a StackOverflowError
(VirtualMachineError).  Normally if this occurred in a plan
microservice/web-application, the thread handling the request would see the
unchecked Throwable/Error and fail, but otherwise the service continues.

With Spark in local mode, due to the quite-specific categorization and
handling of the aforementioned specific Throwable/Error types (ref
Utils.isFatalError
and other Scala definitions), the result when they are thrown is that Spark
deems that the JVM should be forcibly shutdown via System.exit(), thereby
killing the microservice.

Is it reasonable that in the face of the above Errors occuring, we should
ask that Spark does not exit the JVM, instead allowing some exception or
error to be thrown? The System.exit() approach seems aligned with the idea
of a command-line job batch and a quick-exit of the entire JVM and any
running threads, but it is poorly suited to running in local mode in a
microservice.

Thoughts?

Thanks,
Brett


spark worker continuously trying to connect to master and failed in standalone mode

2016-07-19 Thread Neil Chang
Hi,
  I have two virtual pcs on private cloud (ubuntu 14). I installed spark
2.0 preview on both machines. I then tried to test it with standalone mode.
I have no problem start the master. However, when I start the worker
(slave) on another machine, it makes many attempts to connect to master and
failed at the end.
  I can ssh from each machine to another without any problem. I can also
run a master and worker at the same machine without any problem.

What did I miss? Any clue?

here are the messages:

WARN NativeCodeLoader: Unable to load native-hadoop library for your
platform ... using builtin-java classes where applicable
..
INFO Worker: Connecting to master ip:7077 ...
INFO Worker: Retrying connection to master (attempt #1)
..
INFO Worker: Retrying connection to master (attempt #7)
java.lang.IllegalArgumentException: requirement failed: TransportClient has
not yet been set.
   at scala.Predef$.require(Predef.scala:224)
...
WARN NettyRocEnv: Ignored failure: java.io.IOException: Connecting to
ip:7077 timed out
WARN Worker: Failed to connect to master ip.7077



Thanks,
Neil


HiveContext , difficulties in accessing tables in hive schema's/database's other than default database.

2016-07-19 Thread satyajit vegesna
Hi All,

I have been trying to access tables from other schema's , apart from
default , to pull data into dataframe.

i was successful in doing it using the default schema in hive database.
But when i try any other schema/database in hive, i am getting below
error.(Have also not seen any examples related to accessing tables in other
schema/Database apart from default).

16/07/19 18:16:06 INFO hive.metastore: Connected to metastore.
16/07/19 18:16:08 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 472.3 KB, free 472.3 KB)
16/07/19 18:16:08 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 39.6 KB, free 511.9 KB)
16/07/19 18:16:08 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on localhost:41434 (size: 39.6 KB, free: 2.4 GB)
16/07/19 18:16:08 INFO spark.SparkContext: Created broadcast 0 from show at
sparkHive.scala:70
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.hadoop.hive.ql.exec.Utilities.copyTableJobPropertiesToConf(Lorg/apache/hadoop/hive/ql/plan/TableDesc;Lorg/apache/hadoop/mapred/JobConf;)V
at
org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:324)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$12.apply(TableReader.scala:276)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$12.apply(TableReader.scala:276)
at
org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
at
org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:176)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:195)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
at
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
at org.apache.spark.sql.DataFrame.org
$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
at org.apache.spark.sql.DataFrame.org
$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
at
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
at
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)
at 

Unsubscribe

2016-07-19 Thread sjk


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Heavy Stage Concentration - Ends With Failure

2016-07-19 Thread Kuchekar
Hi,

Can you check if the RDD is partitioned correctly with correct partition
number (if you are manually setting the partition value.) . Try using Hash
partitioner while reading the files.

One way you can debug is by checking the number of records that executor
has compared to others in the Stage tab of the Spark UI.

Kuchekar, Nilesh

On Tue, Jul 19, 2016 at 8:16 PM, Aaron Jackson  wrote:

> Hi,
>
> I have a cluster with 15 nodes of which 5 are HDFS nodes.  I kick off a
> job that creates some 120 stages.  Eventually, the active and pending
> stages reduce down to a small bottleneck and it never fails... the tasks
> associated with the 10 (or so) running tasks are always allocated to the
> same executor on the same host.
>
> Sooner or later, it runs out of memory ... or some other resource.  It
> falls over and then they tasks are reallocated to another executor.
>
> Why do we see such heavy concentration of tasks onto a single executor
> when other executors are free?  Were the tasks assigned to an executor when
> the job was decomposed into stages?
>


Heavy Stage Concentration - Ends With Failure

2016-07-19 Thread Aaron Jackson
Hi,

I have a cluster with 15 nodes of which 5 are HDFS nodes.  I kick off a job
that creates some 120 stages.  Eventually, the active and pending stages
reduce down to a small bottleneck and it never fails... the tasks
associated with the 10 (or so) running tasks are always allocated to the
same executor on the same host.

Sooner or later, it runs out of memory ... or some other resource.  It
falls over and then they tasks are reallocated to another executor.

Why do we see such heavy concentration of tasks onto a single executor when
other executors are free?  Were the tasks assigned to an executor when the
job was decomposed into stages?


Re: Missing Exector Logs From Yarn After Spark Failure

2016-07-19 Thread ayan guha
If YARN log aggregation is enabled then logs will be moved to HDFS. You can
use yarn logs -applicationId  to view those logs.

On Wed, Jul 20, 2016 at 8:58 AM, Ted Yu  wrote:

> What's the value for yarn.log-aggregation.retain-seconds
> and yarn.log-aggregation-enable ?
>
> Which hadoop release are you using ?
>
> Thanks
>
> On Tue, Jul 19, 2016 at 3:23 PM, Rachana Srivastava <
> rachana.srivast...@markmonitor.com> wrote:
>
>> I am trying to find the root cause of recent Spark application failure in
>> production. When the Spark application is running I can check NodeManager's
>> yarn.nodemanager.log-dir property to get the Spark executor container logs.
>>
>> The container has logs for both the running Spark applications
>>
>> Here is the view of the container logs: drwx--x--- 3 yarn yarn 51 Jul 19
>> 09:04 application_1467068598418_0209 drwx--x--- 5 yarn yarn 141 Jul 19
>> 09:04 application_1467068598418_0210
>>
>> But when the application is killed both the application logs are
>> automatically deleted. I have set all the log retention setting etc in Yarn
>> to a very large number. But still these logs are deleted as soon as the
>> Spark applications are crashed.
>>
>> Question: How can we retain these Spark application logs in Yarn for
>> debugging when the Spark application is crashed for some reason.
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Little idea needed

2016-07-19 Thread ayan guha
Well this one keeps cropping up in every project especially when hadoop
implemented alongside MPP.
For the fact, there is no reliable out of box update operation available in
hdfs or hive or SPARK.
Hence, one approach is what Mitch suggested, that do not update. Rather
just keep all source records, by timestamping their arrival.
Another way is, if I think an data warehouse with open and closed records,
you can create a partition in hive only for open records. So, you can
refresh that partition in every run.
On 20 Jul 2016 06:08, "Mich Talebzadeh"  wrote:

> Well this is a classic.
>
> The initial load can be done through Sqoop (outside of Spark) or through
> JDBC connection in Spark. 10 million rows in nothing.
>
> Then you have to think of updates and deletes in addition to new rows.
>
> With Sqoop you can load from the last ID in the source table, assuming
> that you have a unique key in Your Oracle table.
>
> If you have 10 new roes and I assume you know how to load these rows from
> Oracle.
>
> I suggest that you add two additional columns to your HDFS/target table,
>
> ,op_type int
> ,op_time timestamp
>
> These two columns will specify the row type op_type = 1,2,3
> INSERT/UPDATE/DELETE and op_time = cast(from_unixtime(unix_timestamp())
> AS op_time) when the record was added.
>
> So you will end up with two additional columns in your HDFS table compared
> to Oracle table and that will be your staging table.
>
> Of course you can do real time analytics through Oracle GoldenGate that
> read the redolog of the source table in Oracle or better Sap Replication
> Server (SRS). You will achieve real-time integration between RDBMS tables
> and Big Data.
>
> Once you have you have the staging table (immutable) and the rest is
> pretty easy. You have the full Entity Life History in this case for records
> and you can do your queries on them.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 July 2016 at 20:27, Aakash Basu  wrote:
>
>> Hi all,
>>
>> I'm trying to pull a full table from oracle, which is huge with some 10
>> million records which will be the initial load to HDFS.
>>
>> Then I will do delta loads everyday in the same folder in HDFS.
>>
>> Now, my query here is,
>>
>> DAY 0 - I did the initial load (full dump).
>>
>> DAY 1 - I'll load only that day's data which has suppose 10 records (5
>> old with some column's value altered and 5 new).
>>
>> Here, my question is, how will I push this file to HDFS through Spark
>> code, if I do append, it will create duplicates (which i don't want), if i
>> keep separate files and while using it in other program am giving the path
>> of it as folder which contains all files /. But in this case also the
>> registerTempTable will have duplicates for those 5 old rows.
>>
>> What is the BEST logic to be applied here?
>>
>> I tried to resolve this by doing a search in that file of the records if
>> matching load the new ones by deleting the old, but this will be time
>> consuming for such a huge record, right?
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>
>


Re: Missing Exector Logs From Yarn After Spark Failure

2016-07-19 Thread Ted Yu
What's the value for yarn.log-aggregation.retain-seconds
and yarn.log-aggregation-enable ?

Which hadoop release are you using ?

Thanks

On Tue, Jul 19, 2016 at 3:23 PM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> I am trying to find the root cause of recent Spark application failure in
> production. When the Spark application is running I can check NodeManager's
> yarn.nodemanager.log-dir property to get the Spark executor container logs.
>
> The container has logs for both the running Spark applications
>
> Here is the view of the container logs: drwx--x--- 3 yarn yarn 51 Jul 19
> 09:04 application_1467068598418_0209 drwx--x--- 5 yarn yarn 141 Jul 19
> 09:04 application_1467068598418_0210
>
> But when the application is killed both the application logs are
> automatically deleted. I have set all the log retention setting etc in Yarn
> to a very large number. But still these logs are deleted as soon as the
> Spark applications are crashed.
>
> Question: How can we retain these Spark application logs in Yarn for
> debugging when the Spark application is crashed for some reason.
>


Saving a pyspark.ml.feature.PCA model

2016-07-19 Thread Ajinkya Kale
Is there a way to save a pyspark.ml.feature.PCA model ? I know mllib has
that but mllib does not have PCA afaik. How do people do model persistence
for inference using the pyspark ml models ? Did not find any documentation
on model persistency for ml.

--ajinkya


Missing Exector Logs From Yarn After Spark Failure

2016-07-19 Thread Rachana Srivastava
I am trying to find the root cause of recent Spark application failure in 
production. When the Spark application is running I can check NodeManager's 
yarn.nodemanager.log-dir property to get the Spark executor container logs.

The container has logs for both the running Spark applications

Here is the view of the container logs: drwx--x--- 3 yarn yarn 51 Jul 19 09:04 
application_1467068598418_0209 drwx--x--- 5 yarn yarn 141 Jul 19 09:04 
application_1467068598418_0210

But when the application is killed both the application logs are automatically 
deleted. I have set all the log retention setting etc in Yarn to a very large 
number. But still these logs are deleted as soon as the Spark applications are 
crashed.

Question: How can we retain these Spark application logs in Yarn for debugging 
when the Spark application is crashed for some reason.


Re: Role-based S3 access outside of EMR

2016-07-19 Thread Andy Davidson
Hi Everett

I always do my initial data exploration and all our product development in
my local dev env. I typically select a small data set and copy it to my
local machine

My main() has an optional command line argument Œ- - runLocal¹ Normally I
load data from either hdfs:/// or S3n:// . If the arg is set I read from
file:///

Sometime I use a CLI arg Œ- -dataFileURL¹

So in your case I would log into my data cluster and use ³AWS s3 cp" to copy
the data into my cluster and then use ³SCP² to copy the data from the data
center back to my local env.

Andy

From:  Everett Anderson 
Date:  Tuesday, July 19, 2016 at 2:30 PM
To:  "user @spark" 
Subject:  Role-based S3 access outside of EMR

> Hi,
> 
> When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop
> FileSystem implementation for s3:// URLs and seems to install the necessary S3
> credentials properties, as well.
> 
> Often, it's nice during development to run outside of a cluster even with the
> "local" Spark master, though, which I've found to be more troublesome. I'm
> curious if I'm doing this the right way.
> 
> There are two issues -- AWS credentials and finding the right combination of
> compatible AWS SDK and Hadoop S3 FileSystem dependencies.
> 
> Credentials and Hadoop Configuration
> 
> For credentials, some guides recommend setting AWS_SECRET_ACCESS_KEY and
> AWS_ACCESS_KEY_ID environment variables or putting the corresponding
> properties in Hadoop XML config files, but it seems better practice to rely on
> machine roles and not expose these.
> 
> What I end up doing is, in code, when not running on EMR, creating a
> DefaultAWSCredentialsProviderChain
>  ultAWSCredentialsProviderChain.html>  and then installing the following
> properties in the Hadoop Configuration using it:
> 
> fs.s3.awsAccessKeyId
> fs.s3n.awsAccessKeyId
> fs.s3a.awsAccessKeyId
> fs.s3.awsSecretAccessKey
> fs.s3n.awsSecretAccessKey
> fs.s3a.awsSecretAccessKey
> 
> I also set the fs.s3.impl and fs.s3n.impl properties to
> org.apache.hadoop.fs.s3a.S3AFileSystem to force them to use the S3A
> implementation since people usually use "s3://" URIs.
> 
> SDK and File System Dependencies
> 
> Some special combination 
> of the Hadoop version, AWS SDK version, and hadoop-aws is necessary.
> 
> One working S3A combination with Spark 1.6.1 + Hadoop 2.7.x for me seems to be
> with
> 
> --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
> 
> Is this generally what people do? Is there a better way?
> 
> I realize this isn't entirely a Spark-specific problem, but as so many people
> seem to be using S3 with Spark, I imagine this community's faced the problem a
> lot.
> 
> Thanks!
> 
> - Everett
> 




Re: Task not serializable: java.io.NotSerializableException: org.json4s.Serialization$$anon$1

2016-07-19 Thread RK Aduri
Did you check this:

case class Example(name : String, age ; Int)

there is a semicolon. should have been (age : Int)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Task-not-serializable-java-io-NotSerializableException-org-json4s-Serialization-anon-1-tp8233p27360.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Role-based S3 access outside of EMR

2016-07-19 Thread Everett Anderson
Hi,

When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop
FileSystem implementation for s3:// URLs and seems to install the necessary
S3 credentials properties, as well.

Often, it's nice during development to run outside of a cluster even with
the "local" Spark master, though, which I've found to be more troublesome.
I'm curious if I'm doing this the right way.

There are two issues -- AWS credentials and finding the right combination
of compatible AWS SDK and Hadoop S3 FileSystem dependencies.

*Credentials and Hadoop Configuration*

For credentials, some guides recommend setting AWS_SECRET_ACCESS_KEY and
AWS_ACCESS_KEY_ID environment variables or putting the corresponding
properties in Hadoop XML config files, but it seems better practice to rely
on machine roles and not expose these.

What I end up doing is, in code, when not running on EMR, creating a
DefaultAWSCredentialsProviderChain

and then installing the following properties in the Hadoop Configuration
using it:

fs.s3.awsAccessKeyId
fs.s3n.awsAccessKeyId
fs.s3a.awsAccessKeyId
fs.s3.awsSecretAccessKey
fs.s3n.awsSecretAccessKey
fs.s3a.awsSecretAccessKey

I also set the fs.s3.impl and fs.s3n.impl properties to
org.apache.hadoop.fs.s3a.S3AFileSystem to force them to use the S3A
implementation since people usually use "s3://" URIs.

*SDK and File System Dependencies*

Some special combination
 of the Hadoop version,
AWS SDK version, and hadoop-aws is necessary.

One working S3A combination with Spark 1.6.1 + Hadoop 2.7.x for me seems to
be with

--packages
com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2

Is this generally what people do? Is there a better way?

I realize this isn't entirely a Spark-specific problem, but as so many
people seem to be using S3 with Spark, I imagine this community's faced the
problem a lot.

Thanks!

- Everett


Re: spark-submit local and Akka startup timeouts

2016-07-19 Thread Bryan Cutler
The patch I was referring to doesn't help on the ActorSystem startup
unfortunately.  As best I can tell the property
"akka.remote.startup-timeout" is what controls this timeout.  You can try
setting this to something greater in your Spark conf and hopefully that
would work.  Otherwise you might have luck trying a more recent version of
Spark, such as 1.6.2 or even 2.0.0 (soon to be released) which no longer
uses Akka and the ActorSystem.  Hope that helps!

On Tue, Jul 19, 2016 at 2:29 AM, Rory Waite  wrote:

> Sorry Bryan, I should have mentioned that I'm running 1.6.0 for hadoop2.6.
> The binaries were downloaded from the Spark website.
>
>
> We're free to upgrade to Spark, create custom builds, etc. Please let me
> know how to display the config property.
>
>   
> www.sdl.com
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
> --
> *From:* Bryan Cutler 
> *Sent:* 19 July 2016 02:20:38
> *To:* Rory Waite
> *Cc:* user
> *Subject:* Re: spark-submit local and Akka startup timeouts
>
>
> Hi Rory, for starters what version of Spark are you using?  I believe that
> in a 1.5.? release (I don't know which one off the top of my head) there
> was an addition that would also display the config property when a timeout
> happened.  That might help some if you are able to upgrade.
>
> On Jul 18, 2016 9:34 AM, "Rory Waite"  wrote:
>
>> Hi All,
>>
>> We have created a regression test for a spark job that is executed during
>> our automated build. It executes a spark-submit with a local master,
>> processes some data, and the exits. We have an issue in that we get a
>> non-deterministic timeout error. It seems to be when the spark context
>> tries to initialise Akka (stack trace below). It doesn't happen often, but
>> when it does it causes the whole build to fail.
>>
>> The machines that run these tests get very heavily loaded, with many
>> regression tests running simultaneously. My theory is that the spark-submit
>> is sometimes unable to initialise Akka in time because the machines are so
>> heavily loaded with the other tests. My first thought was to try to tune
>> some parameter to extend the timeout, but I couldn't find anything in the
>> documentation. The timeout is short at 10s, whereas the default akka
>> timeout is set at 100s.
>>
>> Is there a way to adjust this timeout?
>>
>> 16/07/17 00:04:22 ERROR SparkContext: Error initializing SparkContext.
>> java.util.concurrent.TimeoutException: Futures timed out after [1
>> milliseconds]
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> at scala.concurrent.Await$.result(package.scala:107)
>> at akka.remote.Remoting.start(Remoting.scala:179)
>> at
>> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>> at
>> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
>> at
>> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
>> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
>> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
>> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
>> at org.apache.spark.SparkContext.(SparkContext.scala:457)
>> at com.sdl.nntrainer.NNTrainer$.main(NNTrainer.scala:418)
>> at com.sdl.nntrainer.NNTrainer.main(NNTrainer.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> 

Re: Task not serializable: java.io.NotSerializableException: org.json4s.Serialization$$anon$1

2016-07-19 Thread joshuata
It looks like the problem is that the parse function non-serializeable. This
is most likely because the formats variable is local to the ParseJson
object, and therefore not globally accessible to the cluster. Generally this
problem can be solved by moving the variable inside the closure so that it
is distributed to each worker.

In this specific instance, it makes far more sense to use the  json
datasource
  
provided by newer versions of Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Task-not-serializable-java-io-NotSerializableException-org-json4s-Serialization-anon-1-tp8233p27359.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Little idea needed

2016-07-19 Thread Mich Talebzadeh
Well this is a classic.

The initial load can be done through Sqoop (outside of Spark) or through
JDBC connection in Spark. 10 million rows in nothing.

Then you have to think of updates and deletes in addition to new rows.

With Sqoop you can load from the last ID in the source table, assuming that
you have a unique key in Your Oracle table.

If you have 10 new roes and I assume you know how to load these rows from
Oracle.

I suggest that you add two additional columns to your HDFS/target table,

,op_type int
,op_time timestamp

These two columns will specify the row type op_type = 1,2,3
INSERT/UPDATE/DELETE and op_time = cast(from_unixtime(unix_timestamp()) AS
op_time) when the record was added.

So you will end up with two additional columns in your HDFS table compared
to Oracle table and that will be your staging table.

Of course you can do real time analytics through Oracle GoldenGate that
read the redolog of the source table in Oracle or better Sap Replication
Server (SRS). You will achieve real-time integration between RDBMS tables
and Big Data.

Once you have you have the staging table (immutable) and the rest is pretty
easy. You have the full Entity Life History in this case for records and
you can do your queries on them.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 19 July 2016 at 20:27, Aakash Basu  wrote:

> Hi all,
>
> I'm trying to pull a full table from oracle, which is huge with some 10
> million records which will be the initial load to HDFS.
>
> Then I will do delta loads everyday in the same folder in HDFS.
>
> Now, my query here is,
>
> DAY 0 - I did the initial load (full dump).
>
> DAY 1 - I'll load only that day's data which has suppose 10 records (5 old
> with some column's value altered and 5 new).
>
> Here, my question is, how will I push this file to HDFS through Spark
> code, if I do append, it will create duplicates (which i don't want), if i
> keep separate files and while using it in other program am giving the path
> of it as folder which contains all files /. But in this case also the
> registerTempTable will have duplicates for those 5 old rows.
>
> What is the BEST logic to be applied here?
>
> I tried to resolve this by doing a search in that file of the records if
> matching load the new ones by deleting the old, but this will be time
> consuming for such a huge record, right?
>
> Please help!
>
> Thanks,
> Aakash.
>


Re: Little idea needed

2016-07-19 Thread Jörn Franke
Well as far as I know there is some update statement planned for spark, but not 
sure which release. You could alternatively use Hive+Orc. 
Another alternative would be to add the deltas in a separate file and when 
accessing the table filtering out the double entries. From time to time you 
could have a merge process creating one file out of all the deltas.

> On 19 Jul 2016, at 21:27, Aakash Basu  wrote:
> 
> Hi all,
> 
> I'm trying to pull a full table from oracle, which is huge with some 10 
> million records which will be the initial load to HDFS.
> 
> Then I will do delta loads everyday in the same folder in HDFS.
> 
> Now, my query here is,
> 
> DAY 0 - I did the initial load (full dump).
> 
> DAY 1 - I'll load only that day's data which has suppose 10 records (5 old 
> with some column's value altered and 5 new).
> 
> Here, my question is, how will I push this file to HDFS through Spark code, 
> if I do append, it will create duplicates (which i don't want), if i keep 
> separate files and while using it in other program am giving the path of it 
> as folder which contains all files /. But in this case also the 
> registerTempTable will have duplicates for those 5 old rows.
> 
> What is the BEST logic to be applied here?
> 
> I tried to resolve this by doing a search in that file of the records if 
> matching load the new ones by deleting the old, but this will be time 
> consuming for such a huge record, right?
> 
> Please help!
> 
> Thanks,
> Aakash.


Re: I'm trying to understand how to compile Spark

2016-07-19 Thread Jacek Laskowski
Hi,

hadoop-2.7 would be more fresh. You don't need hadoop.version when the
defaults are fine. 2.7.2 for hadoop-2.7 profile.

Jacdk

On 19 Jul 2016 6:09 p.m., "Jakob Odersky"  wrote:

> Hi Eli,
>
> to build spark, just run
>
> build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests
> package
>
> in your source directory, where package is the actual word "package".
> This will recompile the whole project, so it may take a while when
> running the first time.
> Replacing a single file in an existing jar is not recommended unless
> it is for a quick test, so I would also suggest that you give your
> local spark compilation a custom version as to avoid any ambiguity if
> you depend on it from somewhere else.
>
> Check out this page
> http://spark.apache.org/docs/1.4.1/building-spark.html for more
> detailed information on the build process.
>
> --jakob
>
>
> On Tue, Jul 19, 2016 at 6:42 AM, Ted Yu  wrote:
> > org.apache.spark.mllib.fpm is not a maven goal.
> >
> > -pl is For Individual Projects.
> >
> > Your first build action should not include -pl.
> >
> >
> > On Tue, Jul 19, 2016 at 4:22 AM, Eli Super  wrote:
> >>
> >> Hi
> >>
> >> I have a windows laptop
> >>
> >> I just downloaded the spark 1.4.1 source code.
> >>
> >> I try to compile org.apache.spark.mllib.fpm with mvn
> >>
> >> My goal is to replace original org\apache\spark\mllib\fpm\* in
> >> spark-assembly-1.4.1-hadoop2.6.0.jar
> >>
> >> As I understand from this link
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse
> >>
> >>
> >> I need to execute following command : build/mvn package -DskipTests -pl
> >> assembly
> >> I executed : mvn org.apache.spark.mllib.fpm  -DskipTests -pl assembly
> >>
> >> Then I got an error
> >>  [INFO] Scanning for projects...
> >> [ERROR] [ERROR] Could not find the selected project in the reactor:
> >> assembly @
> >>
> >> Thanks for any help
> >>
> >>
> >>
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Little idea needed

2016-07-19 Thread Aakash Basu
Hi all,

I'm trying to pull a full table from oracle, which is huge with some 10
million records which will be the initial load to HDFS.

Then I will do delta loads everyday in the same folder in HDFS.

Now, my query here is,

DAY 0 - I did the initial load (full dump).

DAY 1 - I'll load only that day's data which has suppose 10 records (5 old
with some column's value altered and 5 new).

Here, my question is, how will I push this file to HDFS through Spark code,
if I do append, it will create duplicates (which i don't want), if i keep
separate files and while using it in other program am giving the path of it
as folder which contains all files /. But in this case also the
registerTempTable will have duplicates for those 5 old rows.

What is the BEST logic to be applied here?

I tried to resolve this by doing a search in that file of the records if
matching load the new ones by deleting the old, but this will be time
consuming for such a huge record, right?

Please help!

Thanks,
Aakash.


Re: Presentation in London: Running Spark on Hive or Hive on Spark

2016-07-19 Thread Ashok Kumar
Thanks Mich looking forward to it :) 

On Tuesday, 19 July 2016, 19:13, Mich Talebzadeh 
 wrote:
 

 Hi all,
This will be in London tomorrow Wednesday 20th July starting at 18:00 hour for 
refreshments and kick off at 18:30, 5 minutes walk from Canary Wharf Station, 
Jubilee Line 
If you wish you can register and get more info here
It will be in La Tasca West India Docks Road E14 
and especially if you like Spanish food :)
Regards,



Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction.  
On 15 July 2016 at 11:06, Joaquin Alzola  wrote:

It is on the 20th (Wednesday) next week. From: Marco Mistroni 
[mailto:mmistr...@gmail.com]
Sent: 15 July 2016 11:04
To: Mich Talebzadeh 
Cc: user @spark ; user 
Subject: Re: Presentation in London: Running Spark on Hive or Hive on Spark Dr 
Mich  do you have any slides or videos available for the presentation you did 
@Canary Wharf?kindest regards marco On Wed, Jul 6, 2016 at 10:37 PM, Mich 
Talebzadeh  wrote:
Dear forum members I will be presenting on the topic of "Running Spark on Hive 
or Hive on Spark, your mileage varies" in Future of Data: London 
DetailsOrganized by: HortonworksDate: Wednesday, July 20, 2016, 6:00 PM to 8:30 
PM Place: LondonLocation: One Canada Square, Canary Wharf,  London E14 
5AB.Nearest Underground:  Canary Warf (map)If you are interested please 
register hereLooking forward to seeing those who can make it to have an 
interesting discussion and leverage your experience.Regards,
Dr Mich Talebzadeh LinkedIn 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk.Any 
and all responsibility for any loss, damage or destruction of data or any other 
property which may arise from relying on this email's technical content is 
explicitly disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction. 
 This email is confidential and may be subject to privilege. If you are not the 
intended recipient, please do not copy or disclose its content but contact the 
sender immediately upon receipt.



  

Re: Execute function once on each node

2016-07-19 Thread Rabin Banerjee
" I am working on a spark application that requires the ability to run a
function on each node in the cluster
"
--
Use Apache Ignite instead of Spark. Trust me it's awesome for this use case.

Regards,
Rabin Banerjee
On Jul 19, 2016 3:27 AM, "joshuata"  wrote:

> I am working on a spark application that requires the ability to run a
> function on each node in the cluster. This is used to read data from a
> directory that is not globally accessible to the cluster. I have tried
> creating an RDD with n elements and n partitions so that it is evenly
> distributed among the n nodes, and then mapping a function over the RDD.
> However, the runtime makes no guarantees that each partition will be stored
> on a separate node. This means that the code will run multiple times on the
> same node while never running on another.
>
> I have looked through the documentation and source code for both RDDs and
> the scheduler, but I haven't found anything that will do what I need. Does
> anybody know of a solution I could use?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Strange behavior including memory leak and NPE

2016-07-19 Thread rachmaninovquartet
Hi,

I've been fighting with a strange situation today. I'm trying to add two
entries for each of the distinct rows of an account, except for the first
and last (by date). Here's an example of some of the code. I can't get the
subset to continue forward:

var acctIdList = X_train.select("m_acct_id").distinct()
acctIdList = acctIdList.filter("m_acct_id is not null")

 for (id <- acctIdList) {
println("m_acct_id = " + id.getInt(0))
val subset = X_train.where("m_acct_id in (" + id.getInt(0).toString +
")") 
 }

The println's will work, if I remove the subsetting logic from the for loop,
and a few iterations of the loop will work with the subsetting logic. I'm
thinking this might be because the creations of these dataframes in the for
loop are eating up memory too quickly. So I might need a different
implementation. This is the logic I'm trying to translate from pandas, if
that helps:

X_train = pd.concat([X_train.groupby('m_acct_id').apply(lambda x:
pd.concat([x.iloc[i: i + k] for i in range(len(x.index) - k + 1)]))])

and here is the top of the stack trace, I tried on Spark 1.5.2 and 1.6.2:

16/07/19 14:39:37 ERROR Executor: Managed memory leak detected; size =
33816576 bytes, TID = 1908
16/07/19 14:39:37 ERROR Executor: Exception in task 1.0 in stage 96.0 (TID
1908)
java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.(DataFrame.scala:131)
at
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:755)
at org.apache.spark.sql.DataFrame.where(DataFrame.scala:792)


Any advice on how to keep moving, would be much appreciated!

Thanks,

Ian




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behavior-including-memory-leak-and-NPE-tp27358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how to setup the development environment of spark with IntelliJ on ubuntu

2016-07-19 Thread joshuata
I have found the easiest way to set up a development platform is to use the 
databricks sbt-spark-package plugin
  (assuming you are using
scala+sbt). You simply add the plugin to your /project/plugins.sbt
file and add the sparkVersion to your build.sbt file. It automatically loads
the necessary packages to build your applications.

It also provides the sbt console command that sets up a local spark repl to
prototype code against. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-the-development-environment-of-spark-with-IntelliJ-on-ubuntu-tp27333p27357.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming takes longer time to read json into dataframes

2016-07-19 Thread Diwakar Dhanuskodi
Okay got it regarding parallelism that  you  are  saying .

Yes , We use dataframe to infer schema and process data. The json schema has 
xml data as one of key value pair.  Xml data needs to be processed  in 
foreachRDD. Json schema doesn't  change often. 

Regards,
Diwakar


Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
 Date:19/07/2016  20:49  (GMT+05:30) 
To: Diwakar Dhanuskodi  Cc: 
Martin Eden , user  
Subject: Re: Spark streaming takes longer time to read json into 
dataframes 
Yes, if you need more parallelism, you need to either add more kafka
partitions or shuffle in spark.

Do you actually need the dataframe api, or are you just using it as a
way to infer the json schema?  Inferring the schema is going to
require reading through the RDD once before doing any other work.  You
may be better off defining your schema in advance.

On Sun, Jul 17, 2016 at 9:33 PM, Diwakar Dhanuskodi
 wrote:
> Hi,
>
> Repartition would  create  shuffle  over  network  which  I should  avoid
> to  reduce processing time because the size of messages at most in a batch
> will  be  5G.
>  Partitioning topic and parallelize receiving in Direct Stream might do  the
> trick.
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Martin Eden 
> Date:16/07/2016 14:01 (GMT+05:30)
> To: Diwakar Dhanuskodi 
> Cc: user 
> Subject: Re: Spark streaming takes longer time to read json into dataframes
>
> Hi,
>
> I would just do a repartition on the initial direct DStream since otherwise
> each RDD in the stream has exactly as many partitions as you have partitions
> in the Kafka topic (in your case 1). Like that receiving is still done in
> only 1 thread but at least the processing further down is done in parallel.
>
> If you want to parallelize your receiving as well I would partition my Kafka
> topic and then the RDDs in the initial DStream will have as many partitions
> as you set in Kafka.
>
> Have you seen this?
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> M
>
> On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi
>  wrote:
>>
>>
>> -- Forwarded message --
>> From: Diwakar Dhanuskodi 
>> Date: Sat, Jul 16, 2016 at 9:30 AM
>> Subject: Re: Spark streaming takes longer time to read json into
>> dataframes
>> To: Jean Georges Perrin 
>>
>>
>> Hello,
>>
>> I need it on memory.  Increased executor memory to 25G and executor cores
>> to 3. Got same result. There is always one task running under executor for
>> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
>> inside foreachRDD is a good approach?
>>
>> Regards,
>> Diwakar.
>>
>> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin  wrote:
>>>
>>> Do you need it on disk or just push it to memory? Can you try to increase
>>> memory or # of cores (I know it sounds basic)
>>>
>>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi
>>> >  wrote:
>>> >
>>> > Hello,
>>> >
>>> > I have 400K json messages pulled from Kafka into spark streaming using
>>> > DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
>>> > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
>>> > convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
>>> > dataframe.
>>> >
>>> > I am running in Yarn client mode with executor memory as 15G and
>>> > executor cores as 2.
>>> >
>>> > Caching rdd before converting into dataframe  doesn't change processing
>>> > time. Whether introducing hash partitions inside foreachRDD  will help? 
>>> > (or)
>>> > Will partitioning topic and have more than one DirectStream help?. How 
>>> > can I
>>> > approach this situation to reduce time in converting to dataframe..
>>> >
>>> > Regards,
>>> > Diwakar.
>>>
>>
>>
>


Re: Spark streaming takes longer time to read json into dataframes

2016-07-19 Thread Diwakar Dhanuskodi
Okay got that bout parallelism that  you  are  saying .

Yes , We use dataframe to infer schema and process data. The json schema has 
xml data as one of key value pair.  Xml data needs to be processed  in 
foreachRDD. Json schema doesn't  change often. 

Regards,
Diwakar.




Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
 Date:19/07/2016  20:49  (GMT+05:30) 
To: Diwakar Dhanuskodi  Cc: 
Martin Eden , user  
Subject: Re: Spark streaming takes longer time to read json into 
dataframes 
Yes, if you need more parallelism, you need to either add more kafka
partitions or shuffle in spark.

Do you actually need the dataframe api, or are you just using it as a
way to infer the json schema?  Inferring the schema is going to
require reading through the RDD once before doing any other work.  You
may be better off defining your schema in advance.

On Sun, Jul 17, 2016 at 9:33 PM, Diwakar Dhanuskodi
 wrote:
> Hi,
>
> Repartition would  create  shuffle  over  network  which  I should  avoid
> to  reduce processing time because the size of messages at most in a batch
> will  be  5G.
>  Partitioning topic and parallelize receiving in Direct Stream might do  the
> trick.
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Martin Eden 
> Date:16/07/2016 14:01 (GMT+05:30)
> To: Diwakar Dhanuskodi 
> Cc: user 
> Subject: Re: Spark streaming takes longer time to read json into dataframes
>
> Hi,
>
> I would just do a repartition on the initial direct DStream since otherwise
> each RDD in the stream has exactly as many partitions as you have partitions
> in the Kafka topic (in your case 1). Like that receiving is still done in
> only 1 thread but at least the processing further down is done in parallel.
>
> If you want to parallelize your receiving as well I would partition my Kafka
> topic and then the RDDs in the initial DStream will have as many partitions
> as you set in Kafka.
>
> Have you seen this?
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> M
>
> On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi
>  wrote:
>>
>>
>> -- Forwarded message --
>> From: Diwakar Dhanuskodi 
>> Date: Sat, Jul 16, 2016 at 9:30 AM
>> Subject: Re: Spark streaming takes longer time to read json into
>> dataframes
>> To: Jean Georges Perrin 
>>
>>
>> Hello,
>>
>> I need it on memory.  Increased executor memory to 25G and executor cores
>> to 3. Got same result. There is always one task running under executor for
>> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
>> inside foreachRDD is a good approach?
>>
>> Regards,
>> Diwakar.
>>
>> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin  wrote:
>>>
>>> Do you need it on disk or just push it to memory? Can you try to increase
>>> memory or # of cores (I know it sounds basic)
>>>
>>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi
>>> >  wrote:
>>> >
>>> > Hello,
>>> >
>>> > I have 400K json messages pulled from Kafka into spark streaming using
>>> > DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
>>> > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
>>> > convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
>>> > dataframe.
>>> >
>>> > I am running in Yarn client mode with executor memory as 15G and
>>> > executor cores as 2.
>>> >
>>> > Caching rdd before converting into dataframe  doesn't change processing
>>> > time. Whether introducing hash partitions inside foreachRDD  will help? 
>>> > (or)
>>> > Will partitioning topic and have more than one DirectStream help?. How 
>>> > can I
>>> > approach this situation to reduce time in converting to dataframe..
>>> >
>>> > Regards,
>>> > Diwakar.
>>>
>>
>>
>


Re: Presentation in London: Running Spark on Hive or Hive on Spark

2016-07-19 Thread Mich Talebzadeh
Hi all,

This will be in London tomorrow Wednesday 20th July starting at 18:00 hour
for refreshments and kick off at 18:30, 5 minutes walk from Canary Wharf
Station, Jubilee Line

If you wish you can register and get more info here


It will be in La Tasca West India Docks Road E14


and especially if you like Spanish food :)

Regards,




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 July 2016 at 11:06, Joaquin Alzola  wrote:

> It is on the 20th (Wednesday) next week.
>
>
>
> *From:* Marco Mistroni [mailto:mmistr...@gmail.com]
> *Sent:* 15 July 2016 11:04
> *To:* Mich Talebzadeh 
> *Cc:* user @spark ; user 
> *Subject:* Re: Presentation in London: Running Spark on Hive or Hive on
> Spark
>
>
>
> Dr Mich
>
>   do you have any slides or videos available for the presentation you did
> @Canary Wharf?
>
> kindest regards
>
>  marco
>
>
>
> On Wed, Jul 6, 2016 at 10:37 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
> Dear forum members
>
>
>
> I will be presenting on the topic of "Running Spark on Hive or Hive on
> Spark, your mileage varies" in Future of Data: London
> 
>
> *Details*
>
> *Organized by: Hortonworks *
>
> *Date: Wednesday, July 20, 2016, 6:00 PM to 8:30 PM *
>
> *Place: London*
>
> *Location: One Canada Square, Canary Wharf,  London E14 5AB.*
>
> *Nearest Underground:  Canary Warf (map
> )
> *
>
> If you are interested please register here
> 
>
> Looking forward to seeing those who can make it to have an interesting
> discussion and leverage your experience.
>
> Regards,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> This email is confidential and may be subject to privilege. If you are not
> the intended recipient, please do not copy or disclose its content but
> contact the sender immediately upon receipt.
>


Re: Spark 7736

2016-07-19 Thread Holden Karau
Indeed there is, signup for an Apache JIRA account then then when you visit
the JIRA page logged in you should see a "reopen issue" button. For issues
like this (reopening a JIRA) - you might find the dev list to be more
useful.

On Wed, Jul 13, 2016 at 4:47 AM, ayan guha  wrote:

> Hi
>
> I am facing same issue reporting on Spark 7736
>  on Spark 1.6.0. Is it
> any way to reopen the Jira?
>
> Reproduction steps attached.
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: spark single PROCESS_LOCAL task

2016-07-19 Thread Holden Karau
So its possible that you have a lot of data in one of the partitions which
is local to that process, maybe you could cache & count the upstream RDD
and see what the input partitions look like? On the otherhand - using
groupByKey is often a bad sign to begin with - can you rewrite your code to
avoid this?

On Fri, Jul 15, 2016 at 10:57 AM, Matt K  wrote:

> Hi all,
>
> I'm seeing some curious behavior which I have a hard time interpreting. I
> have a job which does a "groupByKey" and results in 300 executors. 299 are
> run in NODE_LOCAL mode. 1 executor is run in PROCESS_LOCAL mode.
>
> The 1 executor that runs in PROCESS_LOCAL mode gets about 10x as much
> input as the other executors. It dies with OOM, and the job fails.
>
> Only working theory I have is that there's a single key which has a ton of
> data tied to it. Even so, I can't explain why it's run in PROCESS_LOCAL
> mode and not others.
>
> Anyone has ideas?
>
> Thanks,
> -Matt
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: which one spark ml or spark mllib

2016-07-19 Thread Holden Karau
So Spark ML is going to be the actively developed Machine Learning library
going forward, however back in Spark 1.5 it was still relatively new and an
experimental component so not all of the the save/load support implemented
for the same models. That being said for 2.0 ML doesn't have PMML export
yet (although there is a 3rd party library to help with that
https://github.com/jpmml/jpmml-spark ). I'd encourage you to try and see if
you can upgrade work with Spark ML going forward - but if your stuck at
1.5.X some of what you need is going to only be available in Spark MLlib.
(You could also write your own model export by hand in Spark ML but thats a
lot of work).

On Tue, Jul 19, 2016 at 2:55 AM, pseudo oduesp 
wrote:

> HI,
>
> i don't have any idea why we have to library  ML and mlib
>
> ml you can use it with data frame and mllib with rdd but ml have some
> lakes like:
> save model most important if you want create web api with score
>
> my question why we don't have all features in MLlib on ML ?
>
>
> ( i use pyspark 1.5.0  because entreise restrict  of all a)
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: transtition SQLContext to SparkSession

2016-07-19 Thread Reynold Xin
Yes. But in order to access methods available only in HiveContext a user
cast is required.

On Tuesday, July 19, 2016, Maciej Bryński  wrote:

> @Reynold Xin,
> How this will work with Hive Support ?
> SparkSession.sqlContext return HiveContext ?
>
> 2016-07-19 0:26 GMT+02:00 Reynold Xin 
> >:
> > Good idea.
> >
> > https://github.com/apache/spark/pull/14252
> >
> >
> >
> > On Mon, Jul 18, 2016 at 12:16 PM, Michael Armbrust <
> mich...@databricks.com >
> > wrote:
> >>
> >> + dev, reynold
> >>
> >> Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
> >> public/deprecated?
> >>
> >> On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers  > wrote:
> >>>
> >>> in my codebase i would like to gradually transition to SparkSession, so
> >>> while i start using SparkSession i also want a SQLContext to be
> available as
> >>> before (but with a deprecated warning when i use it). this should be
> easy
> >>> since SQLContext is now a wrapper for SparkSession.
> >>>
> >>> so basically:
> >>> val session = SparkSession.builder.set(..., ...).getOrCreate()
> >>> val sqlc = new SQLContext(session)
> >>>
> >>> however this doesnt work, the SQLContext constructor i am trying to use
> >>> is private. SparkSession.sqlContext is also private.
> >>>
> >>> am i missing something?
> >>>
> >>> a non-gradual switch is not very realistic in any significant codebase,
> >>> and i do not want to create SparkSession and SQLContext independendly
> (both
> >>> from same SparkContext) since that can only lead to confusion and
> >>> inconsistent settings.
> >>
> >>
> >
>
>
>
> --
> Maciek Bryński
>


Re: transtition SQLContext to SparkSession

2016-07-19 Thread Maciej Bryński
@Reynold Xin,
How this will work with Hive Support ?
SparkSession.sqlContext return HiveContext ?

2016-07-19 0:26 GMT+02:00 Reynold Xin :
> Good idea.
>
> https://github.com/apache/spark/pull/14252
>
>
>
> On Mon, Jul 18, 2016 at 12:16 PM, Michael Armbrust 
> wrote:
>>
>> + dev, reynold
>>
>> Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
>> public/deprecated?
>>
>> On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers  wrote:
>>>
>>> in my codebase i would like to gradually transition to SparkSession, so
>>> while i start using SparkSession i also want a SQLContext to be available as
>>> before (but with a deprecated warning when i use it). this should be easy
>>> since SQLContext is now a wrapper for SparkSession.
>>>
>>> so basically:
>>> val session = SparkSession.builder.set(..., ...).getOrCreate()
>>> val sqlc = new SQLContext(session)
>>>
>>> however this doesnt work, the SQLContext constructor i am trying to use
>>> is private. SparkSession.sqlContext is also private.
>>>
>>> am i missing something?
>>>
>>> a non-gradual switch is not very realistic in any significant codebase,
>>> and i do not want to create SparkSession and SQLContext independendly (both
>>> from same SparkContext) since that can only lead to confusion and
>>> inconsistent settings.
>>
>>
>



-- 
Maciek Bryński

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark driver getting out of memory

2016-07-19 Thread RK Aduri
Just want to see if this helps.

Are you doing heavy collects and persist that? If that is so, you might
want to parallelize that collection by converting to an RDD.

Thanks,
RK

On Tue, Jul 19, 2016 at 12:09 AM, Saurav Sinha 
wrote:

> Hi Mich,
>
>1. In what mode are you running the spark standalone, yarn-client,
>yarn cluster etc
>
> Ans: spark standalone
>
>1. You have 4 nodes with each executor having 10G. How many actual
>executors do you see in UI (Port 4040 by default)
>
> Ans: There are 4 executor on which am using 8 cores (--total-executor-core
> 32)
>
>1. What is master memory? Are you referring to diver memory? May be I
>am misunderstanding this
>
> Ans: Driver memory is set as --drive-memory 5g
>
>1. The only real correlation I see with the driver memory is when you
>are running in local mode where worker lives within JVM process that you
>start with spark-shell etc. In that case driver memory matters. However, it
>appears that you are running in another mode with 4 nodes?
>
> Ans: I am running my job as spark-submit and on my worker(executor) node
> there is no OOM issue ,it only happening on driver app.
>
> Thanks,
> Saurav Sinha
>
> On Tue, Jul 19, 2016 at 2:42 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> can you please clarify:
>>
>>
>>1. In what mode are you running the spark standalone, yarn-client,
>>yarn cluster etc
>>2. You have 4 nodes with each executor having 10G. How many actual
>>executors do you see in UI (Port 4040 by default)
>>3. What is master memory? Are you referring to diver memory? May be I
>>am misunderstanding this
>>4. The only real correlation I see with the driver memory is when you
>>are running in local mode where worker lives within JVM process that you
>>start with spark-shell etc. In that case driver memory matters. However, 
>> it
>>appears that you are running in another mode with 4 nodes?
>>
>> Can you get a snapshot of your environment tab in UI and send the output
>> please?
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 18 July 2016 at 11:50, Saurav Sinha  wrote:
>>
>>> I have set --drive-memory 5g. I need to understand that as no of
>>> partition increase drive-memory need to be increased. What will be best
>>> ration of No of partition/drive-memory.
>>>
>>> On Mon, Jul 18, 2016 at 4:07 PM, Zhiliang Zhu 
>>> wrote:
>>>
 try to set --drive-memory xg , x would be as large as can be set .


 On Monday, July 18, 2016 6:31 PM, Saurav Sinha 
 wrote:


 Hi,

 I am running spark job.

 Master memory - 5G
 executor memort 10G(running on 4 node)

 My job is getting killed as no of partition increase to 20K.

 16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at
 WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)
 16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage
 640(foreachPartition at WriteToKafka.java:45)
 16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage:
 List(ShuffleMapStage 518, ShuffleMapStage 639)
 16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()
 16/07/18 14:53:23 INFO DAGScheduler: Submitting ResultStage 640
 (MapPartitionsRDD[271] at map at BuildSolrDocs.java:209), which has no
 missing
 parents
 16/07/18 14:53:23 INFO MemoryStore: ensureFreeSpace(8248) called with
 curMem=41923262, maxMem=2778778828
 16/07/18 14:53:23 INFO MemoryStore: Block broadcast_90 stored as values
 in memory (estimated size 8.1 KB, free 2.5 GB)
 Exception in thread "dag-scheduler-event-loop"
 java.lang.OutOfMemoryError: Java heap space
 at
 org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
 at
 org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
 at
 org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:294)
 at
 org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:273)
 at
 org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:197)
 at
 

Is it good choice to use DAO to store results generated by spark application?

2016-07-19 Thread Yu Wei
Hi guys,


I write spark application and want to store results generated by spark 
application to hbase.

Do I need to access hbase via java api directly?

Or is it better choice to use DAO similar as traditional RDBMS?  I suspect that 
there is major performance downgrade and other negative impacts using DAO. 
However, I have little knowledge in this field.


Any advice?


Thanks,

Jared




Re: Execute function once on each node

2016-07-19 Thread Josh Asplund
Technical limitations keep us from running another filesystem on the SSDs.
We are running on a very large HPC cluster without control over low-level
system components. We have tried setting up an ad-hoc HDFS cluster on the
nodes in our allocation, but we have had very little luck. It ends up being
very brittle and difficult for the simulation code to access.

On Tue, Jul 19, 2016 at 7:08 AM Koert Kuipers  wrote:

> The whole point of a well designed global filesystem is to not move the
> data
>
> On Jul 19, 2016 10:07, "Koert Kuipers"  wrote:
>
>> If you run hdfs on those ssds (with low replication factor) wouldn't it
>> also effectively write to local disk with low latency?
>>
>> On Jul 18, 2016 21:54, "Josh Asplund"  wrote:
>>
>> The spark workers are running side-by-side with scientific simulation
>> code. The code writes output to local SSDs to keep latency low. Due to the
>> volume of data being moved (10's of terabytes +), it isn't really feasible
>> to copy the data to a global filesystem. Executing a function on each node
>> would allow us to read the data in situ without a copy.
>>
>> I understand that manually assigning tasks to nodes reduces fault
>> tolerance, but the simulation codes already explicitly assign tasks, so a
>> failure of any one node is already a full-job failure.
>>
>> On Mon, Jul 18, 2016 at 3:43 PM Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> You can't assume that the number to nodes will be constant as some may
>>> fail, hence you can't guarantee that a function will execute at most once
>>> or atleast once on a node. Can you explain your use case in a bit more
>>> detail?
>>>
>>> On Mon, Jul 18, 2016, 10:57 PM joshuata  wrote:
>>>
 I am working on a spark application that requires the ability to run a
 function on each node in the cluster. This is used to read data from a
 directory that is not globally accessible to the cluster. I have tried
 creating an RDD with n elements and n partitions so that it is evenly
 distributed among the n nodes, and then mapping a function over the RDD.
 However, the runtime makes no guarantees that each partition will be
 stored
 on a separate node. This means that the code will run multiple times on
 the
 same node while never running on another.

 I have looked through the documentation and source code for both RDDs
 and
 the scheduler, but I haven't found anything that will do what I need.
 Does
 anybody know of a solution I could use?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>


Re: Execute function once on each node

2016-07-19 Thread Josh Asplund
Thank you for that advice. I have tried similar techniques, but not that
one.

On Mon, Jul 18, 2016 at 11:42 PM Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Thanks for the explanation. Try creating a custom RDD whose getPartitions
> returns an array of custom partition objects of size n (= number of nodes).
> In a custom partition object, you can have the file path and ip/hostname
> where the partition needs to be computed. Then, have getPreferredLocations
> return the ip/hostname from the partition object and in compute function,
> assert that you are in right ip/hostname (or fail) and read the content of
> the file.
>
> Not a 100% sure it will work though.
>
> On Tue, Jul 19, 2016, 2:54 AM Josh Asplund  wrote:
>
>> The spark workers are running side-by-side with scientific simulation
>> code. The code writes output to local SSDs to keep latency low. Due to the
>> volume of data being moved (10's of terabytes +), it isn't really feasible
>> to copy the data to a global filesystem. Executing a function on each node
>> would allow us to read the data in situ without a copy.
>>
>> I understand that manually assigning tasks to nodes reduces fault
>> tolerance, but the simulation codes already explicitly assign tasks, so a
>> failure of any one node is already a full-job failure.
>>
>> On Mon, Jul 18, 2016 at 3:43 PM Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> You can't assume that the number to nodes will be constant as some may
>>> fail, hence you can't guarantee that a function will execute at most once
>>> or atleast once on a node. Can you explain your use case in a bit more
>>> detail?
>>>
>>> On Mon, Jul 18, 2016, 10:57 PM joshuata  wrote:
>>>
 I am working on a spark application that requires the ability to run a
 function on each node in the cluster. This is used to read data from a
 directory that is not globally accessible to the cluster. I have tried
 creating an RDD with n elements and n partitions so that it is evenly
 distributed among the n nodes, and then mapping a function over the RDD.
 However, the runtime makes no guarantees that each partition will be
 stored
 on a separate node. This means that the code will run multiple times on
 the
 same node while never running on another.

 I have looked through the documentation and source code for both RDDs
 and
 the scheduler, but I haven't found anything that will do what I need.
 Does
 anybody know of a solution I could use?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: Error in Word Count Program

2016-07-19 Thread Jakob Odersky
Does the file /home/user/spark-1.5.1-bin-hadoop2.4/bin/README.md exist?

On Tue, Jul 19, 2016 at 4:30 AM, RK Spark  wrote:

> val textFile = sc.textFile("README.md")val linesWithSpark =
> textFile.filter(line => line.contains("Spark"))
> linesWithSpark.saveAsTextFile("output1")
>
>
> Same error:
>
> org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
> file:/home/user/spark-1.5.1-bin-hadoop2.4/bin/README.md
>


Re: I'm trying to understand how to compile Spark

2016-07-19 Thread Jakob Odersky
Hi Eli,

to build spark, just run

build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests package

in your source directory, where package is the actual word "package".
This will recompile the whole project, so it may take a while when
running the first time.
Replacing a single file in an existing jar is not recommended unless
it is for a quick test, so I would also suggest that you give your
local spark compilation a custom version as to avoid any ambiguity if
you depend on it from somewhere else.

Check out this page
http://spark.apache.org/docs/1.4.1/building-spark.html for more
detailed information on the build process.

--jakob


On Tue, Jul 19, 2016 at 6:42 AM, Ted Yu  wrote:
> org.apache.spark.mllib.fpm is not a maven goal.
>
> -pl is For Individual Projects.
>
> Your first build action should not include -pl.
>
>
> On Tue, Jul 19, 2016 at 4:22 AM, Eli Super  wrote:
>>
>> Hi
>>
>> I have a windows laptop
>>
>> I just downloaded the spark 1.4.1 source code.
>>
>> I try to compile org.apache.spark.mllib.fpm with mvn
>>
>> My goal is to replace original org\apache\spark\mllib\fpm\* in
>> spark-assembly-1.4.1-hadoop2.6.0.jar
>>
>> As I understand from this link
>>
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse
>>
>>
>> I need to execute following command : build/mvn package -DskipTests -pl
>> assembly
>> I executed : mvn org.apache.spark.mllib.fpm  -DskipTests -pl assembly
>>
>> Then I got an error
>>  [INFO] Scanning for projects...
>> [ERROR] [ERROR] Could not find the selected project in the reactor:
>> assembly @
>>
>> Thanks for any help
>>
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is that possible to launch spark streaming application on yarn with only one machine?

2016-07-19 Thread Yu Wei
Thanks very much for your help.

Finally I understood the deploy mode with your explanation after trying 
different approach on my development environment.

Thanks again.


From: Yu Wei 
Sent: Saturday, July 9, 2016 3:04:40 PM
To: Rabin Banerjee
Cc: Mich Talebzadeh; Deng Ching-Mallete; user
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?


I tried to flush the information to external system in cluster mode. It works 
well.

I suspect that in yarn cluster mode, stdout is closed.




From: Rabin Banerjee 
Sent: Saturday, July 9, 2016 4:22:10 AM
To: Yu Wei
Cc: Mich Talebzadeh; Deng Ching-Mallete; user
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?


Ya , I mean dump in hdfs as a file ,via yarn cluster mode .

On Jul 8, 2016 3:10 PM, "Yu Wei" 
> wrote:

How could I dump data into text file? Writing to HDFS or other approach?


Thanks,

Jared


From: Rabin Banerjee 
>
Sent: Thursday, July 7, 2016 7:04:29 PM
To: Yu Wei
Cc: Mich Talebzadeh; user; Deng Ching-Mallete
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?


In that case, I suspect that Mqtt is not getting data while you are submitting  
in yarn cluster .

Can you please try dumping data in text file instead of printing while 
submitting in yarn cluster mode.?

On Jul 7, 2016 12:46 PM, "Yu Wei" 
> wrote:

Yes. Thanks for your clarification.

The problem I encountered is that in yarn cluster mode, no output for 
"DStream.print()" in yarn logs.


In spark implementation org/apache/spark/streaming/dstream/DStream.scala, the 
logs related with "Time" was printed out. However, other information for 
firstNum.take(num).foreach(println) was not printed in logs.

What's the root cause for the behavior difference?


/**
   * Print the first ten elements of each RDD generated in this DStream. This 
is an output
   * operator, so this DStream will be registered as an output stream and there 
materialized.
   */
  def print(): Unit = ssc.withScope {
print(10)
  }

  /**
   * Print the first num elements of each RDD generated in this DStream. This 
is an output
   * operator, so this DStream will be registered as an output stream and there 
materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
  (rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("---")
println("Time: " + time)
println("---")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
  }
}


Thanks,

Jared



From: Rabin Banerjee 
>
Sent: Thursday, July 7, 2016 1:04 PM
To: Yu Wei
Cc: Mich Talebzadeh; Deng Ching-Mallete; 
user@spark.apache.org
Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?

In yarn cluster mode , Driver is running in AM , so you can find the logs in 
that AM log . Open rersourcemanager UI , and check for the Job and logs. or 
yarn logs -applicationId 

In yarn client mode , the driver is the same JVM from where you are launching 
,,So you are getting it in the log .

On Thu, Jul 7, 2016 at 7:56 AM, Yu Wei 
> wrote:

Launching via client deploy mode, it works again.

I'm still a little confused about the behavior difference for cluster and 
client mode on a single machine.


Thanks,

Jared


From: Mich Talebzadeh 
>
Sent: Wednesday, July 6, 2016 9:46:11 PM
To: Yu Wei
Cc: Deng Ching-Mallete; user@spark.apache.org

Subject: Re: Is that possible to launch spark streaming application on yarn 
with only one machine?

Deploy-mode cluster don't think will work.

Try --master yarn --deploy-mode client

FYI


  *   Spark Local - Spark runs on the local host. This is the simplest set up 
and best suited for learners who want to understand different concepts of Spark 
and those performing unit testing.

  *   Spark Standalone – a simple cluster manager included with Spark that 
makes it easy to set up a cluster.

  *   YARN Cluster Mode, the Spark driver runs inside an application master 
process which is managed by YARN on the cluster, and the client can go away 
after 

Re: Latest 200 messages per topic

2016-07-19 Thread Cody Koeninger
Unless you're using only 1 partition per topic, there's no reasonable
way of doing this.  Offsets for one topicpartition do not necessarily
have anything to do with offsets for another topicpartition.  You
could do the last (200 / number of partitions) messages per
topicpartition, but you have no guarantee as to the time those events
represent, especially if your producers are misbehaving.  To be
perfectly clear, this is a consequence of the Kafka data model, and
has nothing to do with spark.

So, given that it's a bad idea and doesn't really do what you're
asking...  you can do this using KafkaUtils.createRDD

On Sat, Jul 16, 2016 at 10:43 AM, Rabin Banerjee
 wrote:
> Just to add ,
>
>   I want to read the MAX_OFFSET of a topic , then read MAX_OFFSET-200 ,
> every time .
>
> Also I want to know , If I want to fetch a specific offset range for Batch
> processing, is there any option for doing that ?
>
>
>
>
> On Sat, Jul 16, 2016 at 9:08 PM, Rabin Banerjee
>  wrote:
>>
>> HI All,
>>
>>I have 1000 kafka topics each storing messages for different devices .
>> I want to use the direct approach for connecting kafka from Spark , in which
>> I am only interested in latest 200 messages in the Kafka .
>>
>> How do I do that ?
>>
>> Thanks.
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming takes longer time to read json into dataframes

2016-07-19 Thread Cody Koeninger
Yes, if you need more parallelism, you need to either add more kafka
partitions or shuffle in spark.

Do you actually need the dataframe api, or are you just using it as a
way to infer the json schema?  Inferring the schema is going to
require reading through the RDD once before doing any other work.  You
may be better off defining your schema in advance.

On Sun, Jul 17, 2016 at 9:33 PM, Diwakar Dhanuskodi
 wrote:
> Hi,
>
> Repartition would  create  shuffle  over  network  which  I should  avoid
> to  reduce processing time because the size of messages at most in a batch
> will  be  5G.
>  Partitioning topic and parallelize receiving in Direct Stream might do  the
> trick.
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Martin Eden 
> Date:16/07/2016 14:01 (GMT+05:30)
> To: Diwakar Dhanuskodi 
> Cc: user 
> Subject: Re: Spark streaming takes longer time to read json into dataframes
>
> Hi,
>
> I would just do a repartition on the initial direct DStream since otherwise
> each RDD in the stream has exactly as many partitions as you have partitions
> in the Kafka topic (in your case 1). Like that receiving is still done in
> only 1 thread but at least the processing further down is done in parallel.
>
> If you want to parallelize your receiving as well I would partition my Kafka
> topic and then the RDDs in the initial DStream will have as many partitions
> as you set in Kafka.
>
> Have you seen this?
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> M
>
> On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi
>  wrote:
>>
>>
>> -- Forwarded message --
>> From: Diwakar Dhanuskodi 
>> Date: Sat, Jul 16, 2016 at 9:30 AM
>> Subject: Re: Spark streaming takes longer time to read json into
>> dataframes
>> To: Jean Georges Perrin 
>>
>>
>> Hello,
>>
>> I need it on memory.  Increased executor memory to 25G and executor cores
>> to 3. Got same result. There is always one task running under executor for
>> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
>> inside foreachRDD is a good approach?
>>
>> Regards,
>> Diwakar.
>>
>> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin  wrote:
>>>
>>> Do you need it on disk or just push it to memory? Can you try to increase
>>> memory or # of cores (I know it sounds basic)
>>>
>>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi
>>> >  wrote:
>>> >
>>> > Hello,
>>> >
>>> > I have 400K json messages pulled from Kafka into spark streaming using
>>> > DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
>>> > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
>>> > convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
>>> > dataframe.
>>> >
>>> > I am running in Yarn client mode with executor memory as 15G and
>>> > executor cores as 2.
>>> >
>>> > Caching rdd before converting into dataframe  doesn't change processing
>>> > time. Whether introducing hash partitions inside foreachRDD  will help? 
>>> > (or)
>>> > Will partitioning topic and have more than one DirectStream help?. How 
>>> > can I
>>> > approach this situation to reduce time in converting to dataframe..
>>> >
>>> > Regards,
>>> > Diwakar.
>>>
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Building standalone spark application via sbt

2016-07-19 Thread Andrew Ehrlich
Yes, spark-core will depend on Hadoop and several other jars.  Here’s the list 
of dependencies: https://github.com/apache/spark/blob/master/core/pom.xml#L35 


Whether you need spark-sql depends on whether you will use the DataFrame API. 
Without spark-sql, you will just have the RDD API.

> On Jul 19, 2016, at 7:09 AM, Sachin Mittal  wrote:
> 
> 
> Hi,
> Can someone please guide me what all jars I need to place in my lib folder of 
> the project to build a standalone scala application via sbt.
> 
> Note I need to provide static dependencies and I cannot download the jars 
> using libraryDependencies.
> So I need to provide all the jars upfront.
> 
> So far I found that we need:
> spark-core_.jar
> 
> Do we also need
> spark-sql_.jar
> and
> hadoop-core-.jar
> 
> Is there any jar from spark side I may be missing? What I found that 
> spark-core needs hadoop-core classes and if I don't add them then sbt was 
> giving me this error:
> [error] bad symbolic reference. A signature in SparkContext.class refers to 
> term hadoop
> [error] in package org.apache which is not available.
> 
> So I was just confused on library dependency part when building an 
> application via sbt. Any inputs here would be helpful.
> 
> Thanks
> Sachin
> 
> 
> 



Building standalone spark application via sbt

2016-07-19 Thread Sachin Mittal
Hi,
Can someone please guide me what all jars I need to place in my lib folder
of the project to build a standalone scala application via sbt.

Note I need to provide static dependencies and I cannot download the jars
using libraryDependencies.
So I need to provide all the jars upfront.

So far I found that we need:
spark-core_.jar

Do we also need
spark-sql_.jar
and
hadoop-core-.jar

Is there any jar from spark side I may be missing? What I found that
spark-core needs hadoop-core classes and if I don't add them then sbt was
giving me this error:
[error] bad symbolic reference. A signature in SparkContext.class refers to
term hadoop
[error] in package org.apache which is not available.

So I was just confused on library dependency part when building an
application via sbt. Any inputs here would be helpful.

Thanks
Sachin


Re: Execute function once on each node

2016-07-19 Thread Koert Kuipers
The whole point of a well designed global filesystem is to not move the data

On Jul 19, 2016 10:07, "Koert Kuipers"  wrote:

> If you run hdfs on those ssds (with low replication factor) wouldn't it
> also effectively write to local disk with low latency?
>
> On Jul 18, 2016 21:54, "Josh Asplund"  wrote:
>
> The spark workers are running side-by-side with scientific simulation
> code. The code writes output to local SSDs to keep latency low. Due to the
> volume of data being moved (10's of terabytes +), it isn't really feasible
> to copy the data to a global filesystem. Executing a function on each node
> would allow us to read the data in situ without a copy.
>
> I understand that manually assigning tasks to nodes reduces fault
> tolerance, but the simulation codes already explicitly assign tasks, so a
> failure of any one node is already a full-job failure.
>
> On Mon, Jul 18, 2016 at 3:43 PM Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> You can't assume that the number to nodes will be constant as some may
>> fail, hence you can't guarantee that a function will execute at most once
>> or atleast once on a node. Can you explain your use case in a bit more
>> detail?
>>
>> On Mon, Jul 18, 2016, 10:57 PM joshuata  wrote:
>>
>>> I am working on a spark application that requires the ability to run a
>>> function on each node in the cluster. This is used to read data from a
>>> directory that is not globally accessible to the cluster. I have tried
>>> creating an RDD with n elements and n partitions so that it is evenly
>>> distributed among the n nodes, and then mapping a function over the RDD.
>>> However, the runtime makes no guarantees that each partition will be
>>> stored
>>> on a separate node. This means that the code will run multiple times on
>>> the
>>> same node while never running on another.
>>>
>>> I have looked through the documentation and source code for both RDDs and
>>> the scheduler, but I haven't found anything that will do what I need.
>>> Does
>>> anybody know of a solution I could use?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>


Re: I'm trying to understand how to compile Spark

2016-07-19 Thread Ted Yu
org.apache.spark.mllib.fpm is not a maven goal.

-pl is For Individual Projects.

Your first build action should not include -pl.


On Tue, Jul 19, 2016 at 4:22 AM, Eli Super  wrote:

> Hi
>
> I have a windows laptop
>
> I just downloaded the spark 1.4.1 source code.
>
> I try to compile *org.apache.spark.mllib.fpm* with *mvn *
>
> My goal is to replace *original *org\apache\spark\mllib\fpm\* in
> *spark-assembly-1.4.1-hadoop2.6.0.jar*
>
> As I understand from this link
>
>
> https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse
>
>
> I need to execute following command : build/mvn package -DskipTests -pl
> assembly
> I executed : mvn org.apache.spark.mllib.fpm  -DskipTests -pl assembly
>
> Then I got an error
>  [INFO] Scanning for projects...
> [ERROR] [ERROR] Could not find the selected project in the reactor:
> assembly @
>
> Thanks for any help
>
>
>
>


Re: Spark ResourceLeak??

2016-07-19 Thread Ted Yu
ResourceLeakDetector doesn't seem to be from Spark.

Please check dependencies for potential leak.

Cheers

On Tue, Jul 19, 2016 at 6:11 AM, Guruji  wrote:

> I am running a Spark Cluster on Mesos. The module reads data from Kafka as
> DirectStream and pushes it into elasticsearch after referring to a redis
> for
> getting Names against IDs.
>
> I have been getting this message in my worker logs.
>
> *16/07/19 11:17:44 ERROR ResourceLeakDetector: LEAK: You are creating too
> many HashedWheelTimer instances.  HashedWheelTimer is a shared resource
> that
> must be reused across the JVM,so that only a few instances are created.
> *
>
> Can't figure out the reason for the Resource Leak. Although when this
> happens, the Batches start slowing down and the pending Queue increases.
> There is hardly going back from there, other than killing it and starting
> it
> again.
>
> Any idea why the resource leak? This message seems to be related to akka
> when I googled. I am using Spark 1.6.2.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ResourceLeak-tp27355.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark ResourceLeak?

2016-07-19 Thread saurabh guru
I am running a Spark Cluster on Mesos. The module reads data from Kafka as
DirectStream and pushes it into elasticsearch after referring to a redis
for getting Names against IDs.

I have been getting this message in my worker logs.


*16/07/19 11:17:44 ERROR ResourceLeakDetector: LEAK: You are creating too
many HashedWheelTimer instances.  HashedWheelTimer is a shared resource
that must be reused across the JVM,so that only a few instances are
created. *

Can't figure out the reason for the Resource Leak. Although when this
happens, the Batches start slowing down and the pending Queue increases.
There is hardly going back from there, other than killing it and starting
it again.

Any idea why the resource leak? This message seems to be related to akka
when I googled. I am using Spark 1.6.2.

-- 
Thanks,
Saurabh


Spark ResourceLeak??

2016-07-19 Thread Guruji
I am running a Spark Cluster on Mesos. The module reads data from Kafka as
DirectStream and pushes it into elasticsearch after referring to a redis for
getting Names against IDs.

I have been getting this message in my worker logs. 

*16/07/19 11:17:44 ERROR ResourceLeakDetector: LEAK: You are creating too
many HashedWheelTimer instances.  HashedWheelTimer is a shared resource that
must be reused across the JVM,so that only a few instances are created.
*

Can't figure out the reason for the Resource Leak. Although when this
happens, the Batches start slowing down and the pending Queue increases.
There is hardly going back from there, other than killing it and starting it
again.

Any idea why the resource leak? This message seems to be related to akka
when I googled. I am using Spark 1.6.2.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ResourceLeak-tp27355.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Error in Word Count Program

2016-07-19 Thread RK Spark
val textFile = sc.textFile("README.md")val linesWithSpark =
textFile.filter(line => line.contains("Spark"))
linesWithSpark.saveAsTextFile("output1")


Same error:

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
file:/home/user/spark-1.5.1-bin-hadoop2.4/bin/README.md


I'm trying to understand how to compile Spark

2016-07-19 Thread Eli Super
Hi

I have a windows laptop

I just downloaded the spark 1.4.1 source code.

I try to compile *org.apache.spark.mllib.fpm* with *mvn *

My goal is to replace *original *org\apache\spark\mllib\fpm\* in
*spark-assembly-1.4.1-hadoop2.6.0.jar*

As I understand from this link

https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse


I need to execute following command : build/mvn package -DskipTests -pl
assembly
I executed : mvn org.apache.spark.mllib.fpm  -DskipTests -pl assembly

Then I got an error
 [INFO] Scanning for projects...
[ERROR] [ERROR] Could not find the selected project in the reactor:
assembly @

Thanks for any help


which one spark ml or spark mllib

2016-07-19 Thread pseudo oduesp
HI,

i don't have any idea why we have to library  ML and mlib

ml you can use it with data frame and mllib with rdd but ml have some lakes
like:
save model most important if you want create web api with score

my question why we don't have all features in MLlib on ML ?


( i use pyspark 1.5.0  because entreise restrict  of all a)


Re: spark-submit local and Akka startup timeouts

2016-07-19 Thread Rory Waite
Sorry Bryan, I should have mentioned that I'm running 1.6.0 for hadoop2.6. The 
binaries were downloaded from the Spark website.


We're free to upgrade to Spark, create custom builds, etc. Please let me know 
how to display the config property.

 [http://dr0muzwhcp26z.cloudfront.net/static/corporate/SDL-logo-2014.png] 

www.sdl.com


SDL PLC confidential, all rights reserved. If you are not the intended 
recipient of this mail SDL requests and requires that you delete it without 
acting upon or copying any of its contents, and we further request that you 
advise us.

SDL PLC is a public limited company registered in England and Wales. Registered 
number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, 
UK.


From: Bryan Cutler 
Sent: 19 July 2016 02:20:38
To: Rory Waite
Cc: user
Subject: Re: spark-submit local and Akka startup timeouts


Hi Rory, for starters what version of Spark are you using?  I believe that in a 
1.5.? release (I don't know which one off the top of my head) there was an 
addition that would also display the config property when a timeout happened.  
That might help some if you are able to upgrade.

On Jul 18, 2016 9:34 AM, "Rory Waite" > 
wrote:
Hi All,

We have created a regression test for a spark job that is executed during our 
automated build. It executes a spark-submit with a local master, processes some 
data, and the exits. We have an issue in that we get a non-deterministic 
timeout error. It seems to be when the spark context tries to initialise Akka 
(stack trace below). It doesn't happen often, but when it does it causes the 
whole build to fail.

The machines that run these tests get very heavily loaded, with many regression 
tests running simultaneously. My theory is that the spark-submit is sometimes 
unable to initialise Akka in time because the machines are so heavily loaded 
with the other tests. My first thought was to try to tune some parameter to 
extend the timeout, but I couldn't find anything in the documentation. The 
timeout is short at 10s, whereas the default akka timeout is set at 100s.

Is there a way to adjust this timeout?

16/07/17 00:04:22 ERROR SparkContext: Error initializing SparkContext.
java.util.concurrent.TimeoutException: Futures timed out after [1 
milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:179)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
at org.apache.spark.SparkContext.(SparkContext.scala:457)
at com.sdl.nntrainer.NNTrainer$.main(NNTrainer.scala:418)
at com.sdl.nntrainer.NNTrainer.main(NNTrainer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/07/17 00:04:22 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
16/07/17 00:04:22 INFO SparkContext: Successfully 

Scala code as "spark view"

2016-07-19 Thread wdaehn
Using Spark via the Thrift server is fine and good but it limits yourself to
simple SQL queries. For all complex Spark logic you have to submit a job
first, write the result into a table and then query the table.
This has obviously the limitation that 
a) The user executing the query cannot pass in information
b) The user executing the query has no idea how current the intermediate
table is
c) Requires to compile the analytic logic into a jar file, upload it, submit
it

Using spark-shell all is much more interactive obviously, you write the
Scala code line by line and visualize the result. But obviously not via
JDBC/Thrift.

Wouldn't it make sense to support a syntax like 

create temporary table myview using Scala options (sourcecode
'val dataframe = sqlContext..
%table dataframe'
);


Example: 
There is a directory with millions of files.
A trained MLlib model is used to categorize these files, output is a
dataframe.
Via JDBC you want to get the categorization of all files with the name
text_2016_07_*.txt only.


Does it make sense? I can't see how this could be done today without a lot
of disadvantages but I am far from being an expert, so please bare with me.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Scala-code-as-spark-view-tp27353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dynamically get value based on Map key in Spark Dataframe

2016-07-19 Thread Jacek Laskowski
Hi Divya,

That's the right way to access a value for a key in a broadcast map.
I'm pretty sure tough that you could do the same (or similar) with
higher-level broadcast Datasets. Try it out!

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 19, 2016 at 4:31 AM, Divya Gehlot  wrote:
> Hi Jacek,
>
> Can you please share example how can I access broacasted map
> val pltStnMapBrdcst = sc.broadcast(keyvalueMap )
> val df_replacekeys =
> df_input.withColumn("map_values",pltStnMapBrdcst.value.get("key"
>
> Is the above the right way to access the broadcasted map ?
>
>
>
> Thanks,
> Divya
>
>
> On 18 July 2016 at 23:06, Jacek Laskowski  wrote:
>>
>> See broadcast variable.
>>
>> Or (just a thought) do join between DataFrames.
>>
>> Jacek
>>
>>
>> On 18 Jul 2016 9:24 a.m., "Divya Gehlot"  wrote:
>>>
>>> Hi,
>>>
>>> I have created a map by reading a text file
>>> val keyValueMap = file_read.map(t => t.getString(0) ->
>>> t.getString(4)).collect().toMap
>>>
>>> Now I have another dataframe where I need to dynamically replace all the
>>> keys of Map with values
>>> val df_input = reading the file as dataframe
>>> val df_replacekeys = df_input.withColumn("map_values",lit(keyValueMap
>>> (col("key"
>>>
>>> Would really appreciate the help .
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark driver getting out of memory

2016-07-19 Thread Saurav Sinha
Hi Mich,

   1. In what mode are you running the spark standalone, yarn-client, yarn
   cluster etc

Ans: spark standalone

   1. You have 4 nodes with each executor having 10G. How many actual
   executors do you see in UI (Port 4040 by default)

Ans: There are 4 executor on which am using 8 cores (--total-executor-core
32)

   1. What is master memory? Are you referring to diver memory? May be I am
   misunderstanding this

Ans: Driver memory is set as --drive-memory 5g

   1. The only real correlation I see with the driver memory is when you
   are running in local mode where worker lives within JVM process that you
   start with spark-shell etc. In that case driver memory matters. However, it
   appears that you are running in another mode with 4 nodes?

Ans: I am running my job as spark-submit and on my worker(executor) node
there is no OOM issue ,it only happening on driver app.

Thanks,
Saurav Sinha

On Tue, Jul 19, 2016 at 2:42 AM, Mich Talebzadeh 
wrote:

> can you please clarify:
>
>
>1. In what mode are you running the spark standalone, yarn-client,
>yarn cluster etc
>2. You have 4 nodes with each executor having 10G. How many actual
>executors do you see in UI (Port 4040 by default)
>3. What is master memory? Are you referring to diver memory? May be I
>am misunderstanding this
>4. The only real correlation I see with the driver memory is when you
>are running in local mode where worker lives within JVM process that you
>start with spark-shell etc. In that case driver memory matters. However, it
>appears that you are running in another mode with 4 nodes?
>
> Can you get a snapshot of your environment tab in UI and send the output
> please?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 18 July 2016 at 11:50, Saurav Sinha  wrote:
>
>> I have set --drive-memory 5g. I need to understand that as no of
>> partition increase drive-memory need to be increased. What will be best
>> ration of No of partition/drive-memory.
>>
>> On Mon, Jul 18, 2016 at 4:07 PM, Zhiliang Zhu 
>> wrote:
>>
>>> try to set --drive-memory xg , x would be as large as can be set .
>>>
>>>
>>> On Monday, July 18, 2016 6:31 PM, Saurav Sinha 
>>> wrote:
>>>
>>>
>>> Hi,
>>>
>>> I am running spark job.
>>>
>>> Master memory - 5G
>>> executor memort 10G(running on 4 node)
>>>
>>> My job is getting killed as no of partition increase to 20K.
>>>
>>> 16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at
>>> WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)
>>> 16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage
>>> 640(foreachPartition at WriteToKafka.java:45)
>>> 16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage:
>>> List(ShuffleMapStage 518, ShuffleMapStage 639)
>>> 16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()
>>> 16/07/18 14:53:23 INFO DAGScheduler: Submitting ResultStage 640
>>> (MapPartitionsRDD[271] at map at BuildSolrDocs.java:209), which has no
>>> missing
>>> parents
>>> 16/07/18 14:53:23 INFO MemoryStore: ensureFreeSpace(8248) called with
>>> curMem=41923262, maxMem=2778778828
>>> 16/07/18 14:53:23 INFO MemoryStore: Block broadcast_90 stored as values
>>> in memory (estimated size 8.1 KB, free 2.5 GB)
>>> Exception in thread "dag-scheduler-event-loop"
>>> java.lang.OutOfMemoryError: Java heap space
>>> at
>>> org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
>>> at
>>> org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
>>> at
>>> org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:294)
>>> at
>>> org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:273)
>>> at
>>> org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:197)
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>>>
>>>
>>> Help needed.
>>>
>>> --
>>> Thanks and Regards,
>>>
>>> Saurav Sinha
>>>
>>> Contact: 9742879062
>>>
>>>
>>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>
>


-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Re: Execute function once on each node

2016-07-19 Thread Aniket Bhatnagar
Thanks for the explanation. Try creating a custom RDD whose getPartitions
returns an array of custom partition objects of size n (= number of nodes).
In a custom partition object, you can have the file path and ip/hostname
where the partition needs to be computed. Then, have getPreferredLocations
return the ip/hostname from the partition object and in compute function,
assert that you are in right ip/hostname (or fail) and read the content of
the file.

Not a 100% sure it will work though.

On Tue, Jul 19, 2016, 2:54 AM Josh Asplund  wrote:

> The spark workers are running side-by-side with scientific simulation
> code. The code writes output to local SSDs to keep latency low. Due to the
> volume of data being moved (10's of terabytes +), it isn't really feasible
> to copy the data to a global filesystem. Executing a function on each node
> would allow us to read the data in situ without a copy.
>
> I understand that manually assigning tasks to nodes reduces fault
> tolerance, but the simulation codes already explicitly assign tasks, so a
> failure of any one node is already a full-job failure.
>
> On Mon, Jul 18, 2016 at 3:43 PM Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> You can't assume that the number to nodes will be constant as some may
>> fail, hence you can't guarantee that a function will execute at most once
>> or atleast once on a node. Can you explain your use case in a bit more
>> detail?
>>
>> On Mon, Jul 18, 2016, 10:57 PM joshuata  wrote:
>>
>>> I am working on a spark application that requires the ability to run a
>>> function on each node in the cluster. This is used to read data from a
>>> directory that is not globally accessible to the cluster. I have tried
>>> creating an RDD with n elements and n partitions so that it is evenly
>>> distributed among the n nodes, and then mapping a function over the RDD.
>>> However, the runtime makes no guarantees that each partition will be
>>> stored
>>> on a separate node. This means that the code will run multiple times on
>>> the
>>> same node while never running on another.
>>>
>>> I have looked through the documentation and source code for both RDDs and
>>> the scheduler, but I haven't found anything that will do what I need.
>>> Does
>>> anybody know of a solution I could use?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>