Re: Unable to run Python Unit tests

2016-03-21 Thread Gayathri Murali
commit f3717fc7c97ea402c9ddf9020405070421eeb4a4

Thanks
Gayathri



On Mon, Mar 21, 2016 at 8:09 PM, Ted Yu  wrote:

> Can you tell us the commit hash your workspace is based on ?
>
> On Mon, Mar 21, 2016 at 8:05 PM, Gayathri Murali <
> gayathri.m.sof...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am trying to run ./python/run-tests on my local master branch. I am
>> getting the following error. I have run this multiple times before and
>> never had this issue. Can someone please help?
>>
>> *Error: Could not find or load main class
>> org.apache.spark.deploy.SparkSubmit*
>> *ERROR*
>>
>> *==*
>> *ERROR: setUpClass (pyspark.streaming.tests.BasicOperationTests)*
>> *--*
>> *Traceback (most recent call last):*
>> *  File "/Users/gayathri/spark/python/pyspark/streaming/tests.py", line
>> 60, in setUpClass*
>> *conf = SparkConf().set("spark.default.parallelism", 1)*
>> *  File "/Users/gayathri/spark/python/pyspark/conf.py", line 104, in
>> __init__*
>> *SparkContext._ensure_initialized()*
>> *  File "/Users/gayathri/spark/python/pyspark/context.py", line 245, in
>> _ensure_initialized*
>> *SparkContext._gateway = gateway or launch_gateway()*
>> *  File "/Users/gayathri/spark/python/pyspark/java_gateway.py", line 94,
>> in launch_gateway*
>> *raise Exception("Java gateway process exited before sending the
>> driver its port number")*
>> *Exception: Java gateway process exited before sending the driver its
>> port number*
>>
>> *There are more errors that follow. *
>>
>>
>> *Thanks*
>> *Gayathri*
>>
>>
>>
>


Re: Unable to run Python Unit tests

2016-03-21 Thread Ted Yu
Can you tell us the commit hash your workspace is based on ?

On Mon, Mar 21, 2016 at 8:05 PM, Gayathri Murali <
gayathri.m.sof...@gmail.com> wrote:

> Hi All,
>
> I am trying to run ./python/run-tests on my local master branch. I am
> getting the following error. I have run this multiple times before and
> never had this issue. Can someone please help?
>
> *Error: Could not find or load main class
> org.apache.spark.deploy.SparkSubmit*
> *ERROR*
>
> *==*
> *ERROR: setUpClass (pyspark.streaming.tests.BasicOperationTests)*
> *--*
> *Traceback (most recent call last):*
> *  File "/Users/gayathri/spark/python/pyspark/streaming/tests.py", line
> 60, in setUpClass*
> *conf = SparkConf().set("spark.default.parallelism", 1)*
> *  File "/Users/gayathri/spark/python/pyspark/conf.py", line 104, in
> __init__*
> *SparkContext._ensure_initialized()*
> *  File "/Users/gayathri/spark/python/pyspark/context.py", line 245, in
> _ensure_initialized*
> *SparkContext._gateway = gateway or launch_gateway()*
> *  File "/Users/gayathri/spark/python/pyspark/java_gateway.py", line 94,
> in launch_gateway*
> *raise Exception("Java gateway process exited before sending the
> driver its port number")*
> *Exception: Java gateway process exited before sending the driver its port
> number*
>
> *There are more errors that follow. *
>
>
> *Thanks*
> *Gayathri*
>
>
>


Unable to run Python Unit tests

2016-03-21 Thread Gayathri Murali
Hi All,

I am trying to run ./python/run-tests on my local master branch. I am
getting the following error. I have run this multiple times before and
never had this issue. Can someone please help?

*Error: Could not find or load main class
org.apache.spark.deploy.SparkSubmit*
*ERROR*

*==*
*ERROR: setUpClass (pyspark.streaming.tests.BasicOperationTests)*
*--*
*Traceback (most recent call last):*
*  File "/Users/gayathri/spark/python/pyspark/streaming/tests.py", line 60,
in setUpClass*
*conf = SparkConf().set("spark.default.parallelism", 1)*
*  File "/Users/gayathri/spark/python/pyspark/conf.py", line 104, in
__init__*
*SparkContext._ensure_initialized()*
*  File "/Users/gayathri/spark/python/pyspark/context.py", line 245, in
_ensure_initialized*
*SparkContext._gateway = gateway or launch_gateway()*
*  File "/Users/gayathri/spark/python/pyspark/java_gateway.py", line 94, in
launch_gateway*
*raise Exception("Java gateway process exited before sending the driver
its port number")*
*Exception: Java gateway process exited before sending the driver its port
number*

*There are more errors that follow. *


*Thanks*
*Gayathri*


Re: Work out date column in CSV more than 6 months old (datediff or something)

2016-03-21 Thread Silvio Fiorito
There’s a months_between function you could use, as well:

df.filter(months_between(current_date, $”Payment Date”) > 6).show

From: Mich Talebzadeh 
>
Date: Monday, March 21, 2016 at 5:53 PM
To: "user @spark" >
Subject: Work out date column in CSV more than 6 months old (datediff or 
something)

Hi,

For test purposes I am reading in a simple csv file as follows:

val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")
df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date: 
string, Net: string, VAT: string, Total: string]

For this work I am interested in column "Payment Date" > 6 months old from today

Data is stored in the following format for that column

scala> df.select("Payment date").take(2)
res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])

stored as 'dd/MM/'

The current time I get as

scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 
'dd/MM/') ").collect.apply(0).getString(0)
today: String = 21/03/2016


So I want to filter the csv file

scala>  df.filter(col("Payment date") < lit(today)).show(2)
+--++-+-+-+
|Invoice Number|Payment date|  Net|  VAT|Total|
+--++-+-+-+
|   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
|   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
+--++-+-+-+


However, I want to use datediff() function here not just < today!


Obviously one can store the file as a table and use SQL on it. However, I want 
to see if there are other ways using fp.

Thanks

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com




Re: cluster randomly re-starting jobs

2016-03-21 Thread Roberto Pagliari
Yes you are right. The job failed and it was re-attempting.

Thank you,


From: Daniel Siegmann 
>
Date: Monday, 21 March 2016 21:33
To: Ted Yu >
Cc: Roberto Pagliari 
>, 
"user@spark.apache.org" 
>
Subject: Re: cluster randomly re-starting jobs

Never used Ambari and I don't know if this is your problem, but I have seen 
similar behavior. In my case, my application failed and Hadoop kicked off a 
second attempt. I didn't realize this, but when I refreshed the Spark UI, 
suddenly everything seemed reset! This is because the application ID is part of 
the URL, but not the attempt ID, so when the context for the second attempt 
starts it will be at the same URL as the context for the first job.

To verify if this is the problem you could look at the application in the 
Hadoop console (or whatever the equivalent is on Ambari) and see if there are 
multiple attempts. You can also see it in the Spark history server (under 
incomplete applications, if the second attempt is still running).

~Daniel Siegmann

On Mon, Mar 21, 2016 at 9:58 AM, Ted Yu 
> wrote:
Can you provide a bit more information ?

Release of Spark and YARN

Have you checked Spark UI / YARN job log to see if there is some clue ?

Cheers

On Mon, Mar 21, 2016 at 6:21 AM, Roberto Pagliari 
> wrote:
I noticed that sometimes the spark cluster seems to restart the job completely.

In the Ambari UI (where I can check jobs/stages) everything that was done up to 
a certain point is removed, and the job is restarted.

Does anyone know what the issue could be?

Thank you,





pyspark sql convert long to timestamp?

2016-03-21 Thread Andy Davidson
Any idea how I have a col in a data frame that is of type long any idea how
I create a column who¹s type is time stamp?

The long is unix epoch in ms

Thanks

Andy




ALS setIntermediateRDDStorageLevel

2016-03-21 Thread Roberto Pagliari
According to this thread

http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-question-td15420.html

There should be a function to set intermediate storage level in ALS. However, 
I'm getting method not found with Spark 1.6. Is it still available? If so, can 
I get to see a minimal example?

Thank you,



Work out date column in CSV more than 6 months old (datediff or something)

2016-03-21 Thread Mich Talebzadeh
Hi,

For test purposes I am reading in a simple csv file as follows:

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")
df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date:
string, Net: string, VAT: string, Total: string]

For this work I am interested in column "Payment Date" > 6 months old from
today

Data is stored in the following format for that column

scala> df.select("Payment date").take(2)
res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])

stored as 'dd/MM/'

The current time I get as

scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
'dd/MM/') ").collect.apply(0).getString(0)
today: String = 21/03/2016


So I want to filter the csv file

scala>  df.filter(col("Payment date") < lit(today)).show(2)
+--++-+-+-+
|Invoice Number|Payment date|  Net|  VAT|Total|
+--++-+-+-+
|   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
|   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
+--++-+-+-+


However, I want to use datediff() function here not just < today!


Obviously one can store the file as a table and use SQL on it. However, I
want to see if there are other ways using fp.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: cluster randomly re-starting jobs

2016-03-21 Thread Daniel Siegmann
Never used Ambari and I don't know if this is your problem, but I have seen
similar behavior. In my case, my application failed and Hadoop kicked off a
second attempt. I didn't realize this, but when I refreshed the Spark UI,
suddenly everything seemed reset! This is because the application ID is
part of the URL, but not the attempt ID, so when the context for the second
attempt starts it will be at the same URL as the context for the first job.

To verify if this is the problem you could look at the application in the
Hadoop console (or whatever the equivalent is on Ambari) and see if there
are multiple attempts. You can also see it in the Spark history server
(under incomplete applications, if the second attempt is still running).

~Daniel Siegmann

On Mon, Mar 21, 2016 at 9:58 AM, Ted Yu  wrote:

> Can you provide a bit more information ?
>
> Release of Spark and YARN
>
> Have you checked Spark UI / YARN job log to see if there is some clue ?
>
> Cheers
>
> On Mon, Mar 21, 2016 at 6:21 AM, Roberto Pagliari <
> roberto.pagli...@asos.com> wrote:
>
>> I noticed that sometimes the spark cluster seems to restart the job
>> completely.
>>
>> In the Ambari UI (where I can check jobs/stages) everything that was done
>> up to a certain point is removed, and the job is restarted.
>>
>> Does anyone know what the issue could be?
>>
>> Thank you,
>>
>>
>


Re: SparkML algos limitations question.

2016-03-21 Thread Joseph Bradley
The indexing I mentioned is more restrictive than that: each index
corresponds to a unique position in a binary tree.  (I.e., the first index
of row 0 is 1, the first of row 1 is 2, the first of row 2 is 4, etc., IIRC)

You're correct that this restriction could be removed; with some careful
thought, we could probably avoid using indices altogether.  I just created
https://issues.apache.org/jira/browse/SPARK-14043  to track this.

On Mon, Mar 21, 2016 at 11:22 AM, Eugene Morozov  wrote:

> Hi, Joseph,
>
> I thought I understood, why it has a limit of 30 levels for decision tree,
> but now I'm not that sure. I thought that's because the decision tree
> stored in the array, which has length of type int, which cannot be more,
> than 2^31-1.
> But here are my new discoveries. I've trained two different random forest
> models of 50 trees and different maxDepth (20 and 30) and specified node
> size = 5. Here are couple of those trees
>
> Model with maxDepth = 20:
> depth=20, numNodes=471
> depth=19, numNodes=497
>
> Model with maxDepth = 30:
> depth=30, numNodes=11347
> depth=30, numNodes=10963
>
> It looks like the tree is not pretty balanced and I understand why that
> happens, but I'm surprised that actual number of nodes way less, than 2^31
> - 1. And now I'm not sure of why the limitation actually exists. With tree
> that consist of 2^31 nodes it'd required to have 8G of memory just to store
> those indexes, so I'd say that depth isn't the biggest issue in such a
> case.
>
> Is it possible to workaround or simply miss maxDepth limitation (without
> codebase modification) to train the tree until I hit the max number of
> nodes? I'd assume that in most cases I simply won't hit it, but the depth
> of the tree would be much more, than 30.
>
>
> --
> Be well!
> Jean Morozov
>
> On Wed, Dec 16, 2015 at 1:00 AM, Joseph Bradley 
> wrote:
>
>> Hi Eugene,
>>
>> The maxDepth parameter exists because the implementation uses Integer
>> node IDs which correspond to positions in the binary tree.  This simplified
>> the implementation.  I'd like to eventually modify it to avoid depending on
>> tree node IDs, but that is not yet on the roadmap.
>>
>> There is not an analogous limit for the GLMs you listed, but I'm not very
>> familiar with the perceptron implementation.
>>
>> Joseph
>>
>> On Mon, Dec 14, 2015 at 10:52 AM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> Hello!
>>>
>>> I'm currently working on POC and try to use Random Forest
>>> (classification and regression). I also have to check SVM and Multiclass
>>> perceptron (other algos are less important at the moment). So far I've
>>> discovered that Random Forest has a limitation of maxDepth for trees and
>>> just out of curiosity I wonder why such a limitation has been introduced?
>>>
>>> An actual question is that I'm going to use Spark ML in production next
>>> year and would like to know if there are other limitations like maxDepth in
>>> RF for other algorithms: Logistic Regression, Perceptron, SVM, etc.
>>>
>>> Thanks in advance for your time.
>>> --
>>> Be well!
>>> Jean Morozov
>>>
>>
>>
>


Re: sliding Top N window

2016-03-21 Thread Lars Albertsson
Hi,

If you can accept approximate top N results, there is a neat solution
for this problem: Use an approximate Map structure called
Count-Min Sketch, in combination with a list of the M top items, where
M > N. When you encounter an item not in the top M, you look up its
count in the Count-Min Sketch do determine whether it qualifies.

You will need to break down your event stream into time windows with a
certain time unit, e.g. minutes or hours, and keep one Count-Min
Sketch for each unit. The CMSs can be added, so you aggregate them to
form your sliding windows. You also keep a top M (aka "heavy hitters")
list for each window.

The data structures required are surprisingly small, and will likely
fit in memory on a single machine, if it can handle the traffic
volume, so you might not need Spark at all. If you choose to use Spark
in order to benefit from windowing, be aware that Spark lumps events
in micro batches based on processing time, not event time.

I made a presentation on approximate counting a couple of years ago.
Slides and video here:
http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105.
You can also search for presentation by Ted Dunning and Mikio Braun,
who have held good presentations on the subject.

There are AFAIK two open source implementations of Count-Min Sketch,
one of them in Algebird.

Let me know if anything is unclear.

Good luck, and let us know how it goes.

Regards,



Lars Albertsson
Data engineering consultant
www.mapflat.com
+46 70 7687109


On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
 wrote:
> Good day,
>
> I have a following task: a stream of “page vies” coming to kafka topic. Each
> view contains list of product Ids from a visited page. The task: to have in
> “real time” Top N product.
>
> I am interested in some solution that would require minimum intermediate
> writes … So  need to build a sliding window for top N product, where the
> product counters dynamically changes and window should present the TOP
> product for the specified period of time.
>
> I believe there is no way to avoid maintaining all product counters counters
> in memory/storage.  But at least I would like to do all logic, all
> calculation on a fly, in memory, not spilling multiple RDD from memory to
> disk.
>
> So I believe I see one way of doing it:
>Take, msg from kafka take and line up, all elementary action (increase by
> 1 the counter for the product PID )
>   Each action will be implemented as a call to HTable.increment()  // or
> easier, with incrementColumnValue()…
>   After each increment I can apply my own operation “offer” would provide
> that only top N products with counters are kept in another Hbase table (also
> with atomic operations).
>  But there is another stream of events: decreasing product counters when
> view expires the legth of sliding window….
>
> So my question: does anybody know/have and can share the piece code/ know
> how: how to implement “sliding Top N window” better.
> If nothing will be offered, I will share what I will do myself.
>
> Thank you
> Alexey
> This message, including any attachments, is the property of Sears Holdings
> Corporation and/or one of its subsidiaries. It is confidential and may
> contain proprietary or legally privileged information. If you are not the
> intended recipient, please delete it without reading the contents. Thank
> you.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can not kill driver properly

2016-03-21 Thread Shixiong(Ryan) Zhu
Could you post the log of Master?

On Mon, Mar 21, 2016 at 9:25 AM, Hao Ren  wrote:

> Update:
>
> I am using --supervise flag for fault tolerance.
>
>
>
> On Mon, Mar 21, 2016 at 4:16 PM, Hao Ren  wrote:
>
>> Using spark 1.6.1
>> Spark Streaming Jobs are submitted via spark-submit (cluster mode)
>>
>> I tried to kill drivers via webUI, it does not work. These drivers are
>> still running.
>> I also tried:
>> 1. spark-submit --master  --kill 
>> 2. ./bin/spark-class org.apache.spark.deploy.Client kill 
>> 
>>
>> Neither works. The workaround is to ssh to the driver node, then kill -9
>> ...
>> jsp shows the same classname DriverWrapper, so need to pick carefully...
>>
>> Any idea why this happens ?
>> BTW, my streaming job's batch duration is one hour. So do we need to wait
>> for job processing to kill kill driver ?
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>


Re: Building spark submodule source code

2016-03-21 Thread Jakob Odersky
Another gotcha to watch out for are the SPARK_* environment variables.
Have you exported SPARK_HOME? In that case, 'spark-shell' will use
Spark from the variable, regardless of the place the script is called
from.
I.e. if SPARK_HOME points to a release version of Spark, your code
changes will never be available by simply running 'spark-shell'.

On Sun, Mar 20, 2016 at 11:23 PM, Akhil Das  wrote:
> Have a look at the intellij setup
> https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IntelliJ
> Once you have the setup ready, you don't have to recompile the whole stuff
> every time.
>
> Thanks
> Best Regards
>
> On Mon, Mar 21, 2016 at 8:14 AM, Tenghuan He  wrote:
>>
>> Hi everyone,
>>
>> I am trying to add a new method to spark RDD. After changing the code
>> of RDD.scala and running the following command
>> mvn -pl :spark-core_2.10 -DskipTests clean install
>> It BUILD SUCCESS, however, when starting the bin\spark-shell, my
>> method cannot be found.
>> Do I have to rebuild the whole spark project instead the spark-core
>> submodule to make the changes work?
>> Rebuiling the whole project is too time consuming, is there any better
>> choice?
>>
>>
>> Thanks & Best Regards
>>
>> Tenghuan He
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Limit pyspark.daemon threads

2016-03-21 Thread Carlile, Ken



No further input on this? I discovered today that the pyspark.daemon threadcount was actually 48, which makes a little more sense (at least it’s a multiple of 16), and it seems to be happening at reduce and collect portions of the code. 


—Ken



On Mar 17, 2016, at 10:51 AM, Carlile, Ken  wrote:



Thanks! I found that part just after I sent the email… whoops. I’m guessing that’s not an issue for my users, since it’s been set that way for a couple of years now. 


The thread count is definitely an issue, though, since if enough nodes go down, they can’t schedule their spark clusters. 


—Ken



On Mar 17, 2016, at 10:50 AM, Ted Yu  wrote:



I took a look at docs/configuration.md
Though I didn't find answer for your first question, I think the following pertains to your second question:




  spark.python.worker.memory
  512m
  
    Amount of memory to use per python worker process during aggregation, in the same
    format as JVM memory strings (e.g. 512m, 2g). If the memory
    used during aggregation goes above this amount, it will spill the data into disks.
  




On Thu, Mar 17, 2016 at 7:43 AM, Carlile, Ken 
 wrote:

Hello,

We have an HPC cluster that we run Spark jobs on using standalone mode and a number of scripts I’ve built up to dynamically schedule and start spark clusters within the Grid Engine framework. Nodes in the cluster have 16 cores and 128GB of RAM.

My users use pyspark heavily. We’ve been having a number of problems with nodes going offline with extraordinarily high load. I was able to look at one of those nodes today before it went truly sideways, and I discovered that the user was running 50 pyspark.daemon
 threads (remember, this is a 16 core box), and the load was somewhere around 25 or so, with all CPUs maxed out at 100%.

So while the spark worker is aware it’s only got 16 cores and behaves accordingly, pyspark seems to be happy to overrun everything like crazy. Is there a global parameter I can use to limit pyspark threads to a sane number, say 15 or 16? It would also be interesting
 to set a memory limit, which leads to another question.

How is memory managed when pyspark is used? I have the spark worker memory set to 90GB, and there is 8GB of system overhead (GPFS caching), so if pyspark operates outside of the JVM memory pool, that leaves it at most 30GB to play with, assuming there is no
 overhead outside the JVM’s 90GB heap (ha ha.)

Thanks,
Ken Carlile
Sr. Unix Engineer
HHMI/Janelia Research Campus
571-209-4363












Т�ХF�V�7V'67&��R���âW6W"�V�7V'67&�7&��6�R��Фf�"FF�F����6����G2�R���âW6W"ֆV�7&��6�R��Р







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Merging ML Estimator and Model

2016-03-21 Thread Joseph Bradley
Spark devs & users,

I want to bring attention to a proposal to merge the MLlib (spark.ml)
concepts of Estimator and Model in Spark 2.0.  Please comment & discuss on
SPARK-14033  (not in
this email thread).

*TL;DR:*
*Proposal*: Merge Estimator and Model under a single abstraction
(Estimator).
*Goals*: Simplify API by combining the tightly coupled concepts of
Estimator & Model.  Match other ML libraries like scikit-learn.  Simplify
mutability semantics.

*Details*: See https://issues.apache.org/jira/browse/SPARK-14033 for a
design document (Google doc & PDF).

Thanks in advance for feedback!
Joseph


Re: SparkML algos limitations question.

2016-03-21 Thread Eugene Morozov
Hi, Joseph,

I thought I understood, why it has a limit of 30 levels for decision tree,
but now I'm not that sure. I thought that's because the decision tree
stored in the array, which has length of type int, which cannot be more,
than 2^31-1.
But here are my new discoveries. I've trained two different random forest
models of 50 trees and different maxDepth (20 and 30) and specified node
size = 5. Here are couple of those trees

Model with maxDepth = 20:
depth=20, numNodes=471
depth=19, numNodes=497

Model with maxDepth = 30:
depth=30, numNodes=11347
depth=30, numNodes=10963

It looks like the tree is not pretty balanced and I understand why that
happens, but I'm surprised that actual number of nodes way less, than 2^31
- 1. And now I'm not sure of why the limitation actually exists. With tree
that consist of 2^31 nodes it'd required to have 8G of memory just to store
those indexes, so I'd say that depth isn't the biggest issue in such a
case.

Is it possible to workaround or simply miss maxDepth limitation (without
codebase modification) to train the tree until I hit the max number of
nodes? I'd assume that in most cases I simply won't hit it, but the depth
of the tree would be much more, than 30.


--
Be well!
Jean Morozov

On Wed, Dec 16, 2015 at 1:00 AM, Joseph Bradley 
wrote:

> Hi Eugene,
>
> The maxDepth parameter exists because the implementation uses Integer node
> IDs which correspond to positions in the binary tree.  This simplified the
> implementation.  I'd like to eventually modify it to avoid depending on
> tree node IDs, but that is not yet on the roadmap.
>
> There is not an analogous limit for the GLMs you listed, but I'm not very
> familiar with the perceptron implementation.
>
> Joseph
>
> On Mon, Dec 14, 2015 at 10:52 AM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Hello!
>>
>> I'm currently working on POC and try to use Random Forest (classification
>> and regression). I also have to check SVM and Multiclass perceptron (other
>> algos are less important at the moment). So far I've discovered that Random
>> Forest has a limitation of maxDepth for trees and just out of curiosity I
>> wonder why such a limitation has been introduced?
>>
>> An actual question is that I'm going to use Spark ML in production next
>> year and would like to know if there are other limitations like maxDepth in
>> RF for other algorithms: Logistic Regression, Perceptron, SVM, etc.
>>
>> Thanks in advance for your time.
>> --
>> Be well!
>> Jean Morozov
>>
>
>


Re: Error selecting from a Hive ORC table in Spark-sql

2016-03-21 Thread Mich Talebzadeh
sounds like with ORC transactional table this happens

When I create that table as ORC but non transactional it works!

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 March 2016 at 17:53, Eugene Koifman  wrote:

> The system thinks t2 is an Acid table but the files on disk don’t follow
> the convention acid system would expect.
> Perhaps Xuefu Zhang would know more on Spark/Aicd integration.
>
> From: Mich Talebzadeh 
> Reply-To: "u...@hive.apache.org" 
> Date: Monday, March 21, 2016 at 9:39 AM
> To: "user @spark" , user 
> Subject: Error selecting from a Hive ORC table in Spark-sql
>
> Hi,
>
> Do we know the cause of this error when selecting from an Hive ORC table
>
> spark-sql>
> *select * from t2; *16/03/21 16:38:33 ERROR SparkSQLDriver: Failed in
> [select * from t2]
> java.lang.RuntimeException: serious problem
> at
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
> at
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
> at
> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177)
> at
> org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:587)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:308)
> at
> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NumberFormatException: For input string: "039_"
> at
> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:252)
>  

Find all invoices more than 6 months from csv file

2016-03-21 Thread Mich Talebzadeh
Hi,

For test purposes I am ready a simple csv file as follows:

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")
df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date:
string, Net: string, VAT: string, Total: string]

For this work I am interested in column "Payment Date" > 6 months old from
today

Data is stored in the following format for that column

scala> df.select("Payment date").take(2)
res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])

stored as 'dd/MM/'

The current time I get as

scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
'dd/MM/') ").collect.apply(0).getString(0)
today: String = 21/03/2016


So I want to filter the csv file

scala>  df.filter(col("Payment date") < lit(today)).show(2)
+--++-+-+-+
|Invoice Number|Payment date|  Net|  VAT|Total|
+--++-+-+-+
|   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
|   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
+--++-+-+-+


However, I want to use datediff() function here not just < today!


Obviously one can store the file as a table and use SQL on it. However, I
want to see if there are other ways using fp.

Thanks





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: Spark SQL Optimization

2016-03-21 Thread gtinside
More details : 

Execution plan for Original query 
select distinct pge.portfolio_code 
from table1 pge join table2 p 
on p.perm_group = pge.anc_port_group 
join table3 uge 
on p.user_group=uge.anc_user_group 
where uge.user_name = 'user' and p.perm_type = 'TEST' 

== Physical Plan ==
TungstenAggregate(key=[portfolio_code#14119], functions=[],
output=[portfolio_code#14119])
 TungstenExchange hashpartitioning(portfolio_code#14119)
  TungstenAggregate(key=[portfolio_code#14119], functions=[],
output=[portfolio_code#14119])
   TungstenProject [portfolio_code#14119]
BroadcastHashJoin [user_group#13665], [anc_user_group#13658], BuildRight
 TungstenProject [portfolio_code#14119,user_group#13665]
  BroadcastHashJoin [anc_port_group#14117], [perm_group#13667],
BuildRight
   ConvertToUnsafe
Scan
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
   ConvertToUnsafe
Project [user_group#13665,perm_group#13667]
 Filter (perm_type#13666 = TEST)
  Scan
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][user_group#13665,perm_group#13667,perm_type#13666]
 ConvertToUnsafe
  Project [anc_user_group#13658]
   Filter (user_name#13659 = user)
Scan
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]



Execution plan for optimized query 
select distinct pge.portfolio_code 
from table1 uge, table2 p, table3 pge 
where uge.user_name = 'user' and p.perm_type = 'TEST' 
and p.perm_group = pge.anc_port_group 
and p.user_group=uge.anc_user_group 

== Physical Plan ==
TungstenAggregate(key=[portfolio_code#14119], functions=[],
output=[portfolio_code#14119])
 TungstenExchange hashpartitioning(portfolio_code#14119)
  TungstenAggregate(key=[portfolio_code#14119], functions=[],
output=[portfolio_code#14119])
   TungstenProject [portfolio_code#14119]
BroadcastHashJoin [perm_group#13667], [anc_port_group#14117], BuildRight
 TungstenProject [perm_group#13667]
  BroadcastHashJoin [anc_user_group#13658], [user_group#13665],
BuildRight
   ConvertToUnsafe
Project [anc_user_group#13658]
 Filter (user_name#13659 = user)
  Scan
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
   ConvertToUnsafe
Project [perm_group#13667,user_group#13665]
 Filter (perm_type#13666 = TEST)
  Scan
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666]
 ConvertToUnsafe
  Scan
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]












--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548p26553.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL Optimization

2016-03-21 Thread Xiao Li
Hi, Maybe you can open a JIRA and upload your plan as Michael suggested.
This is an interesting feature. Thanks!

Xiao Li

2016-03-21 10:36 GMT-07:00 Michael Armbrust :

> It's helpful if you can include the output of EXPLAIN EXTENDED or
> df.explain(true) whenever asking about query performance.
>
> On Mon, Mar 21, 2016 at 6:27 AM, gtinside  wrote:
>
>> Hi ,
>>
>> I am trying to execute a simple query with join on 3 tables. When I look
>> at
>> the execution plan , it varies with position of table in the "from"
>> clause.
>> Execution plan looks more optimized when the position of table with
>> predicates is specified before any other table.
>>
>>
>> Original query :
>>
>> select distinct pge.portfolio_code
>> from table1 pge join table2 p
>> on p.perm_group = pge.anc_port_group
>> join table3 uge
>> on p.user_group=uge.anc_user_group
>> where uge.user_name = 'user' and p.perm_type = 'TEST'
>>
>> Optimized query (table with predicates is moved ahead):
>>
>> select distinct pge.portfolio_code
>> from table1 uge, table2 p, table3 pge
>> where uge.user_name = 'user' and p.perm_type = 'TEST'
>> and p.perm_group = pge.anc_port_group
>> and p.user_group=uge.anc_user_group
>>
>>
>> Execution plan is more optimized for the optimized query and hence the
>> query
>> executes faster. All the tables are being sourced from parquet files
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Best way to store Avro Objects as Parquet using SPARK

2016-03-21 Thread Michael Armbrust
>
> But when tired using Spark streamng I could not find a way to store the
> data with the avro schema information. The closest that I got was to create
> a Dataframe using the json RDDs and store them as parquet. Here the parquet
> files had a spark specific schema in their footer.
>

Does this cause a problem?  This is just extra information that we use to
store metadata that parquet doesn't directly support, but I would still
expect other systems to be able to read it.


Re: Spark SQL Optimization

2016-03-21 Thread Michael Armbrust
It's helpful if you can include the output of EXPLAIN EXTENDED or
df.explain(true) whenever asking about query performance.

On Mon, Mar 21, 2016 at 6:27 AM, gtinside  wrote:

> Hi ,
>
> I am trying to execute a simple query with join on 3 tables. When I look at
> the execution plan , it varies with position of table in the "from" clause.
> Execution plan looks more optimized when the position of table with
> predicates is specified before any other table.
>
>
> Original query :
>
> select distinct pge.portfolio_code
> from table1 pge join table2 p
> on p.perm_group = pge.anc_port_group
> join table3 uge
> on p.user_group=uge.anc_user_group
> where uge.user_name = 'user' and p.perm_type = 'TEST'
>
> Optimized query (table with predicates is moved ahead):
>
> select distinct pge.portfolio_code
> from table1 uge, table2 p, table3 pge
> where uge.user_name = 'user' and p.perm_type = 'TEST'
> and p.perm_group = pge.anc_port_group
> and p.user_group=uge.anc_user_group
>
>
> Execution plan is more optimized for the optimized query and hence the
> query
> executes faster. All the tables are being sourced from parquet files
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Error selecting from a Hive ORC table in Spark-sql

2016-03-21 Thread Mich Talebzadeh
Hi,

Do we know the cause of this error when selecting from an Hive ORC table

spark-sql>
*select * from t2;*16/03/21 16:38:33 ERROR SparkSQLDriver: Failed in
[select * from t2]
java.lang.RuntimeException: serious problem
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:587)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:308)
at
org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException:
java.lang.NumberFormatException: For input string: "039_"
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:252)
at java.util.concurrent.FutureTask.get(FutureTask.java:111)
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
... 43 more
Caused by: java.lang.NumberFormatException: For input string: "039_"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:441)
at java.lang.Long.parseLong(Long.java:483)
at
org.apache.hadoop.hive.ql.io.AcidUtils.parseDelta(AcidUtils.java:310)
at
org.apache.hadoop.hive.ql.io.AcidUtils.getAcidState(AcidUtils.java:379)
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.call(OrcInputFormat.java:634)
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.call(OrcInputFormat.java:620)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at

Re: spark shuffle service on yarn

2016-03-21 Thread Marcelo Vanzin
If you use any shuffle service before 2.0 it should be compatible with
all previous releases.

The 2.0 version has currently an incompatibility that we should
probably patch before releasing 2.0, to support this kind of use case
(among others).

On Fri, Mar 18, 2016 at 7:25 PM, Koert Kuipers  wrote:
> spark on yarn is nice because i can bring my own spark. i am worried that
> the shuffle service forces me to use some "sanctioned" spark version that is
> officially "installed" on the cluster.
>
> so... can i safely install the spark 1.3 shuffle service on yarn and use it
> with other 1.x versions of spark?
>
> thanks



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can not kill driver properly

2016-03-21 Thread Hao Ren
Update:

I am using --supervise flag for fault tolerance.



On Mon, Mar 21, 2016 at 4:16 PM, Hao Ren  wrote:

> Using spark 1.6.1
> Spark Streaming Jobs are submitted via spark-submit (cluster mode)
>
> I tried to kill drivers via webUI, it does not work. These drivers are
> still running.
> I also tried:
> 1. spark-submit --master  --kill 
> 2. ./bin/spark-class org.apache.spark.deploy.Client kill 
> 
>
> Neither works. The workaround is to ssh to the driver node, then kill -9
> ...
> jsp shows the same classname DriverWrapper, so need to pick carefully...
>
> Any idea why this happens ?
> BTW, my streaming job's batch duration is one hour. So do we need to wait
> for job processing to kill kill driver ?
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>



-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Extending Spark Catalyst optimizer with own rules

2016-03-21 Thread tolyasik
I want to use Catalyst rules to transform star-scheme SQL query to SQL query
to denormalized star-scheme where some fields from dimensions tables are
represented in facts table.
I tried to find some extension points to add own rules to make a
transformation described above. But I didn't find any extension points. So
there are the following questions:
1) How can I add own rules to catalyst optimizer?
2) Is there another solution to implement a functionality described above?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extending-Spark-Catalyst-optimizer-with-own-rules-tp26552.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to gracefully handle Kafka OffsetOutOfRangeException

2016-03-21 Thread Ramkumar Venkataraman
Which is what surprises me as well. I am able to consistently reproduce
this on my spark 1.5.2 - the same spark job crashes immediately without
checkpointing, but when I enable it, the job continues inspite of the
exceptions.

On Mon, Mar 21, 2016 at 8:25 PM, Cody Koeninger  wrote:

> Spark streaming in general will retry a batch N times then move on to
> the next one... off the top of my head, I'm not sure why checkpointing
> would have an effect on that.
>
> On Mon, Mar 21, 2016 at 3:25 AM, Ramkumar Venkataraman
>  wrote:
> > Thanks Cody for the quick help. Yes, the exception is happening in the
> > executors during processing. I will look into cloning the KafkaRDD and
> > swallowing the exception.
> >
> > But, something weird is happening: when I enable checkpointing on the
> job,
> > my job doesn't crash, it happily proceeds with the next batch, even
> though I
> > see tons of exceptions in the executor logs. So the question is: why is
> it
> > that the spark job doesn't crash when checkpointing is enabled?
> >
> > I have my code pasted here:
> > https://gist.github.com/ramkumarvenkat/00f4fc63f750c537defd
> >
> > I am not too sure if this is an issue with spark engine or with the
> > streaming module. Please let me know if you need more logs or you want
> me to
> > raise a github issue/JIRA.
> >
> > Sorry for digressing on the original thread.
> >
> > On Fri, Mar 18, 2016 at 8:10 PM, Cody Koeninger 
> wrote:
> >>
> >> Is that happening only at startup, or during processing?  If that's
> >> happening during normal operation of the stream, you don't have enough
> >> resources to process the stream in time.
> >>
> >> There's not a clean way to deal with that situation, because it's a
> >> violation of preconditions.  If you want to modify the code to do what
> >> makes sense for you, start looking at handleFetchErr in KafkaRDD.scala
> >>   Recompiling that package isn't a big deal, because it's not a part
> >> of the core spark deployment, so you'll only have to change your job,
> >> not the deployed version of spark.
> >>
> >>
> >>
> >> On Fri, Mar 18, 2016 at 6:16 AM, Ramkumar Venkataraman
> >>  wrote:
> >> > I am using Spark streaming and reading data from Kafka using
> >> > KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> >> > smallest.
> >> >
> >> > But in some Kafka partitions, I get
> >> > kafka.common.OffsetOutOfRangeException
> >> > and my spark job crashes.
> >> >
> >> > I want to understand if there is a graceful way to handle this failure
> >> > and
> >> > not kill the job. I want to keep ignoring these exceptions, as some
> >> > other
> >> > partitions are fine and I am okay with data loss.
> >> >
> >> > Is there any way to handle this and not have my spark job crash? I
> have
> >> > no
> >> > option of increasing the kafka retention period.
> >> >
> >> > I tried to have the DStream returned by createDirectStream() wrapped
> in
> >> > a
> >> > Try construct, but since the exception happens in the executor, the
> Try
> >> > construct didn't take effect. Do you have any ideas of how to handle
> >> > this?
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> >
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> > For additional commands, e-mail: user-h...@spark.apache.org
> >> >
> >
> >
>


Re: Spark Metrics Framework?

2016-03-21 Thread Silvio Fiorito
You could use the metric sources and sinks described here: 
http://spark.apache.org/docs/latest/monitoring.html#metrics

If you want to push the metrics to another system you can define a custom sink. 
You can also extend the metrics by defining a custom source.

From: Mike Sukmanowsky 
>
Date: Monday, March 21, 2016 at 11:54 AM
To: "user@spark.apache.org" 
>
Subject: Spark Metrics Framework?

We make extensive use of the elasticsearch-hadoop library for Hadoop/Spark. In 
trying to troubleshoot our Spark applications, it'd be very handy to have 
access to some of the many 
metrics
 that the library makes available when running in map reduce mode. The 
library's author 
noted 
that Spark doesn't offer any kind of a similar metrics API where by these 
metrics could be reported or aggregated on.

Are there any plans to bring a metrics framework similar to Hadoop's Counter 
system to Spark or is there an alternative means for us to grab metrics exposed 
when using Hadoop APIs to load/save RDDs?

Thanks,
Mike


Spark Metrics Framework?

2016-03-21 Thread Mike Sukmanowsky
We make extensive use of the elasticsearch-hadoop library for Hadoop/Spark.
In trying to troubleshoot our Spark applications, it'd be very handy to
have access to some of the many metrics

that the library makes available when running in map reduce mode. The library's
author noted
 that
Spark doesn't offer any kind of a similar metrics API where by these
metrics could be reported or aggregated on.

Are there any plans to bring a metrics framework similar to Hadoop's
Counter system to Spark or is there an alternative means for us to grab
metrics exposed when using Hadoop APIs to load/save RDDs?

Thanks,
Mike


HADOOP_HOME or hadoop.home.dir are not set

2016-03-21 Thread Hari Krishna Dara
I am using Spark 1.5.2 in yarn mode with Hadoop 2.6.0 (cdh5.4.2) and I am
consistently seeing the below exception in the map container logs for Spark
jobs (full stacktrace at the end of the message):

java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:304)
at org.apache.hadoop.util.Shell.(Shell.java:329)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
at
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:605)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:61)

This is the code that generates the above exception:

String home = System.getProperty("hadoop.home.dir");

// fall back to the system/user-global env variable
if (home == null) {
  home = System.getenv("HADOOP_HOME");
}

try {
   // couldn't find either setting for hadoop's home directory
   if (home == null) {
 throw new IOException("HADOOP_HOME or hadoop.home.dir are not
set.");
   }

I have hadoop home set in multiple places, such as:
- in bin/yarn as a system property
- in libexec/hadoop-config.sh as environment variable
- in conf/spark-env.sh as environment variable

However, this doesn't get passed in to the container JVM's. In fact, that
is the case even with a plain YARN job. I took a simple WordCount
application and added setup() method with the below code:

String homeDirProp = System.getProperty("hadoop.home.dir");
String homeDirEnv = System.getenv("HADOOP_HOME");
System.out.println("hadoop.home.dir="+homeDirProp+"
HADOOP_HOME="+homeDirEnv);

and when I check the stdout of the containers, I see this:

hadoop.home.dir=null HADOOP_HOME=null

As it stands, the IOException doesn't immediately fail the job, but I am
trying to understand   another issue with determining proxy IP and want to
rule this out. Interestingly, there doesn't seem to be anyway to pass a
system property or environment variable to map/reduce containers, so there
is no direct way to satisfy the Shell class, but it would be possible for
some other class to inject the system property as a workaround before it is
looked up by Shell.

Anyone else seen this issue? Could I be missing something here?

Thank you,
Hari

Full stack trace:

java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:304)
at org.apache.hadoop.util.Shell.(Shell.java:329)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
at
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:605)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:61)
at
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:52)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.(YarnSparkHadoopUtil.scala:46)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at
org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:386)
at
org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:384)
at
org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:384)
at
org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:401)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:149)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:250)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)


Re: Zip File and XML parsing with Spark Streaming

2016-03-21 Thread tjb305
Many thanks I will try that and come back with my findings.

Toby

On 21 March 2016 at 03:15, firemonk91 [via Apache Spark User List] <
ml-node+s1001560n26544...@n3.nabble.com> wrote:

> You can write the incoming message to a temp location and use Java
> ZipInputStream to unzip the file. You probably can extract to a folder (if
> they are not already extracted to a folder) and just scan and parse the xml
> in the folder.
>
> Dhiraj Peechara
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Zip-File-and-XML-parsing-with-Spark-Streaming-tp26527p26544.html
> To unsubscribe from Zip File and XML parsing with Spark Streaming, click
> here
> 
> .
> NAML
> 
>



-- 
Toby
t...@mrtydiablo.co.uk




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Zip-File-and-XML-parsing-with-Spark-Streaming-tp26527p26551.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Can not kill driver properly

2016-03-21 Thread Hao Ren
Using spark 1.6.1
Spark Streaming Jobs are submitted via spark-submit (cluster mode)

I tried to kill drivers via webUI, it does not work. These drivers are
still running.
I also tried:
1. spark-submit --master  --kill 
2. ./bin/spark-class org.apache.spark.deploy.Client kill 


Neither works. The workaround is to ssh to the driver node, then kill -9 ...
jsp shows the same classname DriverWrapper, so need to pick carefully...

Any idea why this happens ?
BTW, my streaming job's batch duration is one hour. So do we need to wait
for job processing to kill kill driver ?

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: How to gracefully handle Kafka OffsetOutOfRangeException

2016-03-21 Thread Cody Koeninger
Spark streaming in general will retry a batch N times then move on to
the next one... off the top of my head, I'm not sure why checkpointing
would have an effect on that.

On Mon, Mar 21, 2016 at 3:25 AM, Ramkumar Venkataraman
 wrote:
> Thanks Cody for the quick help. Yes, the exception is happening in the
> executors during processing. I will look into cloning the KafkaRDD and
> swallowing the exception.
>
> But, something weird is happening: when I enable checkpointing on the job,
> my job doesn't crash, it happily proceeds with the next batch, even though I
> see tons of exceptions in the executor logs. So the question is: why is it
> that the spark job doesn't crash when checkpointing is enabled?
>
> I have my code pasted here:
> https://gist.github.com/ramkumarvenkat/00f4fc63f750c537defd
>
> I am not too sure if this is an issue with spark engine or with the
> streaming module. Please let me know if you need more logs or you want me to
> raise a github issue/JIRA.
>
> Sorry for digressing on the original thread.
>
> On Fri, Mar 18, 2016 at 8:10 PM, Cody Koeninger  wrote:
>>
>> Is that happening only at startup, or during processing?  If that's
>> happening during normal operation of the stream, you don't have enough
>> resources to process the stream in time.
>>
>> There's not a clean way to deal with that situation, because it's a
>> violation of preconditions.  If you want to modify the code to do what
>> makes sense for you, start looking at handleFetchErr in KafkaRDD.scala
>>   Recompiling that package isn't a big deal, because it's not a part
>> of the core spark deployment, so you'll only have to change your job,
>> not the deployed version of spark.
>>
>>
>>
>> On Fri, Mar 18, 2016 at 6:16 AM, Ramkumar Venkataraman
>>  wrote:
>> > I am using Spark streaming and reading data from Kafka using
>> > KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
>> > smallest.
>> >
>> > But in some Kafka partitions, I get
>> > kafka.common.OffsetOutOfRangeException
>> > and my spark job crashes.
>> >
>> > I want to understand if there is a graceful way to handle this failure
>> > and
>> > not kill the job. I want to keep ignoring these exceptions, as some
>> > other
>> > partitions are fine and I am okay with data loss.
>> >
>> > Is there any way to handle this and not have my spark job crash? I have
>> > no
>> > option of increasing the kafka retention period.
>> >
>> > I tried to have the DStream returned by createDirectStream() wrapped in
>> > a
>> > Try construct, but since the exception happens in the executor, the Try
>> > construct didn't take effect. Do you have any ideas of how to handle
>> > this?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to collect data for some particular point in spark streaming

2016-03-21 Thread Cody Koeninger
Kafka doesn't have an accurate time-based index.  Your options are to
maintain an index yourself, or start at a sufficiently early offset
and filter messages.

On Mon, Mar 21, 2016 at 7:28 AM, Nagu Kothapalli
 wrote:
> Hi,
>
>
> I Want to collect data from kafka ( json Data , Ordered )   to particular
> time stamp . is there any way to  do with spark streaming ?
>
> Please let me know.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saving model S3

2016-03-21 Thread Ted Yu
Please see this related thread:

http://search-hadoop.com/m/q3RTtSYa3F1OT6H=DirectFileOutputCommiter

On Mon, Mar 21, 2016 at 7:45 AM, Yasemin Kaya  wrote:

> Hi Ted,
>
> I don't understand the issue that you want to learn? Could you be more
> clear please?
>
>
>
>
>
> 2016-03-21 15:24 GMT+02:00 Ted Yu :
>
>> Was speculative execution enabled ?
>>
>> Thanks
>>
>> On Mar 21, 2016, at 6:19 AM, Yasemin Kaya  wrote:
>>
>> Hi,
>>
>> I am using S3 read data also I want to save my model S3. In reading part
>> there is no error, but when I save model I am getting this error
>> . I tried to
>> change the way from s3n to s3a but nothing change, different errors comes.
>>
>> *reading path*
>> s3n://tani-online/weblog/
>>
>> *model saving path*
>> s3n://tani-online/model/
>>
>> *configuration*
>>
>> sc.hadoopConfiguration().set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem");
>> sc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID);
>> sc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
>> AWS_SECRET_ACCESS_KEY);
>>
>>
>> ps: I am using spark-1.6.0-bin-hadoop2.4
>>
>> Best,
>> yasemin
>>
>> --
>> hiç ender hiç
>>
>>
>
>
> --
> hiç ender hiç
>


Re: Saving model S3

2016-03-21 Thread Yasemin Kaya
Hi Ted,

I don't understand the issue that you want to learn? Could you be more
clear please?





2016-03-21 15:24 GMT+02:00 Ted Yu :

> Was speculative execution enabled ?
>
> Thanks
>
> On Mar 21, 2016, at 6:19 AM, Yasemin Kaya  wrote:
>
> Hi,
>
> I am using S3 read data also I want to save my model S3. In reading part
> there is no error, but when I save model I am getting this error
> . I tried to
> change the way from s3n to s3a but nothing change, different errors comes.
>
> *reading path*
> s3n://tani-online/weblog/
>
> *model saving path*
> s3n://tani-online/model/
>
> *configuration*
>
> sc.hadoopConfiguration().set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem");
> sc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID);
> sc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
> AWS_SECRET_ACCESS_KEY);
>
>
> ps: I am using spark-1.6.0-bin-hadoop2.4
>
> Best,
> yasemin
>
> --
> hiç ender hiç
>
>


-- 
hiç ender hiç


Re: Setting up spark to run on two nodes

2016-03-21 Thread Luciano Resende
There is also sbin/star-all.sh and sbin/stop-all.sh which enables you to
star/stop master and workers all together

On Sunday, March 20, 2016, Akhil Das  wrote:

> You can simply execute the sbin/start-slaves.sh file to start up all slave
> processes. Just make sure you have spark installed on the same path on all
> the machines.
>
> Thanks
> Best Regards
>
> On Sat, Mar 19, 2016 at 4:01 AM, Ashok Kumar  > wrote:
>
>> Experts.
>>
>> Please your valued advice.
>>
>> I have spark 1.5.2 set up as standalone for now and I have started the
>> master as below
>>
>> start-master.sh
>>
>> I also have modified config/slave file to have
>>
>> # A Spark Worker will be started on each of the machines listed below.
>> localhost
>> workerhost
>>
>>
>> On the localhost I start slave as follows:
>>
>> start-slave.sh spark:localhost:7077
>>
>> Questions.
>>
>> If I want worker process to be started not only on localhost but also
>> workerhost
>>
>> 1) Do I need just to do start-slave.sh on localhost and it will start
>> the worker process on other node -> workerhost
>> 2) Do I have to runt start-slave.sh spark:workerhost:7077 as well locally
>> on workerhost
>> 3) On GUI http:// 
>> localhost:4040/environment/
>> I do not see any reference to worker process running on workerhost
>>
>> Appreciate any help on how to go about starting the master on localhost
>> and starting two workers one on localhost and the other on workerhost
>>
>> Thanking you
>>
>>
>

-- 
Sent from my Mobile device


Re: Using lz4 in Kafka seems to be broken by jpountz dependency upgrade in Spark 1.5.x+

2016-03-21 Thread Marcin Kuthan
Hi Stefan

Have you got any response from Spark team regarding LZ4 library
compatibility? To avoid this kind of problems, lz4 should be shaded in
Spark distribution, IMHO.

Currently I'm not able to update Spark in my application due to this issue.
It is not possible to consume compressed topics (lz4) using spark streaming
1.5 or higher :-(
As a temporary workaround I could patch LZ4 net.jpountz.util.Utils class or
Kafka KafkaLZ4BlockInputStream KafkaLZ4BlockOutputStream classes. Neither
elegant and safe.

Marcin


On 12 January 2016 at 10:30, Stefan Schadwinkel <
stefan.schadwin...@smaato.com> wrote:

> Hi all,
>
> we'd like to upgrade one of our Spark jobs from 1.4.1 to 1.5.2 (we run
> Spark on Amazon EMR).
>
> The job consumes and pushes lz4 compressed data from/to Kafka.
>
> When upgrading to 1.5.2 everything works fine, except we get the following
> exception:
>
> java.lang.NoSuchMethodError: net.jpountz.util.Utils.checkRange([BII)V
> at
> org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.write(KafkaLZ4BlockOutputStream.java:179)
> at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> at org.apache.kafka.common.record.Compressor.putLong(Compressor.java:132)
> at
> org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:85)
> at
> org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:63)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:171)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:338)
>
>
> Some digging yields:
>
> - The net.jpountz.lz4/lz4 dependency was upgraded from 1.2.0 to 1.3.0 in
> Spark 1.5+ to fix some issues with IBM JDK:
> https://issues.apache.org/jira/browse/SPARK-7063
>
> - The net.jpountz.lz4 1.3.0 version refactored net.jpountz.util.Utils to
> net.jpountz.util.SafeUtils, thus yielding the above inconsistency:
> https://github.com/jpountz/lz4-java/blob/1.3.0/src/java/net/jpountz/util/SafeUtils.java
>
> - Kafka on github up to 0.9.0.0 uses jpountz 1.2.0 (
> https://github.com/apache/kafka/blob/0.9.0.0/build.gradle), however, the
> latest trunk upgrades to 1.3.0.
>
>
> Patching Kafka to use SafeUtils is easy and creating Jar for our projects
> that includes the correct depencies as well, the option of compiling Spark
> 1.5.x with jpountz 1.2.0 should also work, but I didn't try yet.
>
> The main problem is that Spark 1.5.x+ and all Kafka 0.8 releases are
> incompatible in regard to lz4 compression and we would like to avoid
> provisioning EMR with a custom Spark through bootstrapping due to the
> operational overhead.
>
> One could try to play with the classpath and a Jar file with compatible
> dependencies, but I was wondering if nobody else uses Kafka with lz4 and
> Spark and has run into the same issue?
>
> Maybe there's also an easier way to reconcile the situation?
>
> BTW: There's a similar issue regarding Druid as well, but no
> reconciliation beyond patching Kafka was discussed:
> https://groups.google.com/forum/#!topic/druid-user/ZW_Clovf42k
>
> Any input would be highly appreciated.
>
>
> Best regards,
> Stefan
>
>
> --
>
> *Dr. Stefan Schadwinkel*
> Senior Big Data Developer
> stefan.schadwin...@smaato.com
>
>
>
>
> Smaato Inc.
> San Francisco – New York - Hamburg - Singapore
> www.smaato.com
>
>
>
>
>
> Germany:
> Valentinskamp 70, Emporio, 19th Floor
>
> 20355 Hamburg
>
>
> T  +49 (40) 3480 949 0
> F  +49 (40) 492 19 055
>
>
>
> The information contained in this communication may be CONFIDENTIAL and is
> intended only for the use of the recipient(s) named above. If you are not
> the intended recipient, you are hereby notified that any dissemination,
> distribution, or copying of this communication, or any of its contents, is
> strictly prohibited. If you have received this communication in error,
> please notify the sender and delete/destroy the original message and any
> copy of it from your computer or paper files.
>
>
>
>
>


Re: cluster randomly re-starting jobs

2016-03-21 Thread Ted Yu
Can you provide a bit more information ?

Release of Spark and YARN

Have you checked Spark UI / YARN job log to see if there is some clue ?

Cheers

On Mon, Mar 21, 2016 at 6:21 AM, Roberto Pagliari  wrote:

> I noticed that sometimes the spark cluster seems to restart the job
> completely.
>
> In the Ambari UI (where I can check jobs/stages) everything that was done
> up to a certain point is removed, and the job is restarted.
>
> Does anyone know what the issue could be?
>
> Thank you,
>
>


Spark SQL Optimization

2016-03-21 Thread gtinside
Hi ,

I am trying to execute a simple query with join on 3 tables. When I look at
the execution plan , it varies with position of table in the "from" clause.
Execution plan looks more optimized when the position of table with
predicates is specified before any other table.


Original query :

select distinct pge.portfolio_code 
from table1 pge join table2 p
on p.perm_group = pge.anc_port_group 
join table3 uge
on p.user_group=uge.anc_user_group
where uge.user_name = 'user' and p.perm_type = 'TEST'

Optimized query (table with predicates is moved ahead):

select distinct pge.portfolio_code 
from table1 uge, table2 p, table3 pge 
where uge.user_name = 'user' and p.perm_type = 'TEST' 
and p.perm_group = pge.anc_port_group 
and p.user_group=uge.anc_user_group


Execution plan is more optimized for the optimized query and hence the query
executes faster. All the tables are being sourced from parquet files



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saving model S3

2016-03-21 Thread Ted Yu
Was speculative execution enabled ?

Thanks

> On Mar 21, 2016, at 6:19 AM, Yasemin Kaya  wrote:
> 
> Hi,
> 
> I am using S3 read data also I want to save my model S3. In reading part 
> there is no error, but when I save model I am getting this error . I tried to 
> change the way from s3n to s3a but nothing change, different errors comes.
> 
> reading path
> s3n://tani-online/weblog/
> 
> model saving path
> s3n://tani-online/model/
> 
> configuration
> sc.hadoopConfiguration().set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem");
>   sc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 
> AWS_ACCESS_KEY_ID);
>   sc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 
> AWS_SECRET_ACCESS_KEY);
> 
> 
> ps: I am using spark-1.6.0-bin-hadoop2.4
> 
> Best,
> yasemin
> 
> -- 
> hiç ender hiç


cluster randomly re-starting jobs

2016-03-21 Thread Roberto Pagliari
I noticed that sometimes the spark cluster seems to restart the job completely.

In the Ambari UI (where I can check jobs/stages) everything that was done up to 
a certain point is removed, and the job is restarted.

Does anyone know what the issue could be?

Thank you,



Saving model S3

2016-03-21 Thread Yasemin Kaya
Hi,

I am using S3 read data also I want to save my model S3. In reading part
there is no error, but when I save model I am getting this error
. I tried to change
the way from s3n to s3a but nothing change, different errors comes.

*reading path*
s3n://tani-online/weblog/

*model saving path*
s3n://tani-online/model/

*configuration*
sc.hadoopConfiguration().set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID);
sc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
AWS_SECRET_ACCESS_KEY);


ps: I am using spark-1.6.0-bin-hadoop2.4

Best,
yasemin

-- 
hiç ender hiç


How to collect data for some particular point in spark streaming

2016-03-21 Thread Nagu Kothapalli
Hi,


I Want to collect data from kafka ( json Data , Ordered )   to particular
time stamp . is there any way to  do with spark streaming ?

Please let me know.


RE: Run External R script from Spark

2016-03-21 Thread Sun, Rui
It’s a possible approach. It actually leverages Spark’s parallel execution.  
PipeRDD’s  launching of external processes is just like that in pySpark and 
SparkR for RDD API.

The concern is pipeRDD relies on text based serialization/deserialization. 
Whether the performance is acceptable actually depends on your workload and 
cluster configurations. You can do some profiling to evaluate it.

From: sujeet jog [mailto:sujeet@gmail.com]
Sent: Monday, March 21, 2016 2:10 PM
To: user@spark.apache.org
Subject: Run External R script from Spark

Hi,

I have been working on a POC on some time series related stuff, i'm using 
python since i need spark streaming and sparkR is yet to have a spark streaming 
front end,  couple of algorithms i want to use are not yet present in Spark-TS 
package, so I'm thinking of invoking a external R script for the Algorithm part 
& pass the data from Spark to the R script via pipeRdd,


What i wanted to understand is can something like this be used in a production 
deployment,  since passing the data via R script would mean lot of serializing 
and would actually not use the power of spark for parallel execution,

Has anyone used this kind of workaround  Spark -> pipeRdd-> R script.


Thanks,
Sujeet


SparkSQL 2.0 snapshot - thrift server behavior

2016-03-21 Thread Raymond Honderdors
Hi,

We were running with spark 1.6.x and using the "SHOW TABLES IN 'default'" 
command to read the list of tables. I have noticed that when I run the same on 
version 2.0.0 I get an empty result, but when I run "SHOW TABLES" I get the 
result I am after.

Can we get the support back for the "SHOW TABLES IN 'default'"?


Raymond Honderdors
Team Lead Analytics BI
Business Intelligence Developer
raymond.honderd...@sizmek.com
T +972.7325.3569
Herzliya


[Read More]

[http://www.sizmek.com/Sizmek.png]


Re: Best way to store Avro Objects as Parquet using SPARK

2016-03-21 Thread Manivannan Selvadurai
Hi,

Which version of spark are you using??

On Mon, Mar 21, 2016 at 12:28 PM, Sebastian Piu 
wrote:

> We use this, but not sure how the schema is stored
>
> Job job = Job.getInstance();
> ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
> AvroParquetOutputFormat.setSchema(job, schema);
> LazyOutputFormat.setOutputFormatClass(job, new
> ParquetOutputFormat().getClass());
> job.getConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs",
> "false");
> job.getConfiguration().set("parquet.enable.summary-metadata", "false");
>
> //save the file
> rdd.mapToPair(me -> new Tuple2(null, me))
> .saveAsNewAPIHadoopFile(
> String.format("%s/%s", path, timeStamp.milliseconds()),
> Void.class,
> clazz,
> LazyOutputFormat.class,
> job.getConfiguration());
>
> On Mon, 21 Mar 2016, 05:55 Manivannan Selvadurai, <
> smk.manivan...@gmail.com> wrote:
>
>> Hi All,
>>
>>   In my current project there is a requirement to store avro data
>> (json format) as parquet files.
>> I was able to use AvroParquetWriter in separately to create the Parquet
>> Files. The parquet files along with the data also had the 'avro schema'
>> stored on them as a part of their footer.
>>
>>But when tired using Spark streamng I could not find a way to
>> store the data with the avro schema information. The closest that I got was
>> to create a Dataframe using the json RDDs and store them as parquet. Here
>> the parquet files had a spark specific schema in their footer.
>>
>>   Is this the right approach or do I have a better one. Please guide
>> me.
>>
>>
>> We are using Spark 1.4.1.
>>
>> Thanks In Advance!!
>>
>


java.lang.OutOfMemoryError: Direct buffer memory when using broadcast join

2016-03-21 Thread Dai, Kevin
Hi,  All


I'm joining a small table (about 200m) with a huge table using broadcast join, 
however, spark throw the exception as follows:


16/03/20 22:32:06 WARN TransportChannelHandler: Exception in connection from
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at 
io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
at 
io.netty.buffer.CompositeByteBuf.allocBuffer(CompositeByteBuf.java:1345)
at 
io.netty.buffer.CompositeByteBuf.consolidateIfNeeded(CompositeByteBuf.java:276)
at 
io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:116)
at 
org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:82)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

Can anyone tell me what's wrong and how to fix it?

Best Regards,
Kevin.



Re: How to gracefully handle Kafka OffsetOutOfRangeException

2016-03-21 Thread Ramkumar Venkataraman
Thanks Cody for the quick help. Yes, the exception is happening in the
executors during processing. I will look into cloning the KafkaRDD and
swallowing the exception.

But, something weird is happening: when I enable checkpointing on the job,
my job doesn't crash, it happily proceeds with the next batch, even though
I see tons of exceptions in the executor logs. So the question is: why is
it that the spark job doesn't crash when checkpointing is enabled?

I have my code pasted here:
https://gist.github.com/ramkumarvenkat/00f4fc63f750c537defd

I am not too sure if this is an issue with spark engine or with the
streaming module. Please let me know if you need more logs or you want me
to raise a github issue/JIRA.

Sorry for digressing on the original thread.

On Fri, Mar 18, 2016 at 8:10 PM, Cody Koeninger  wrote:

> Is that happening only at startup, or during processing?  If that's
> happening during normal operation of the stream, you don't have enough
> resources to process the stream in time.
>
> There's not a clean way to deal with that situation, because it's a
> violation of preconditions.  If you want to modify the code to do what
> makes sense for you, start looking at handleFetchErr in KafkaRDD.scala
>   Recompiling that package isn't a big deal, because it's not a part
> of the core spark deployment, so you'll only have to change your job,
> not the deployed version of spark.
>
>
>
> On Fri, Mar 18, 2016 at 6:16 AM, Ramkumar Venkataraman
>  wrote:
> > I am using Spark streaming and reading data from Kafka using
> > KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> > smallest.
> >
> > But in some Kafka partitions, I get
> kafka.common.OffsetOutOfRangeException
> > and my spark job crashes.
> >
> > I want to understand if there is a graceful way to handle this failure
> and
> > not kill the job. I want to keep ignoring these exceptions, as some other
> > partitions are fine and I am okay with data loss.
> >
> > Is there any way to handle this and not have my spark job crash? I have
> no
> > option of increasing the kafka retention period.
> >
> > I tried to have the DStream returned by createDirectStream() wrapped in a
> > Try construct, but since the exception happens in the executor, the Try
> > construct didn't take effect. Do you have any ideas of how to handle
> this?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Issue with wholeTextFiles

2016-03-21 Thread Sarath Chandra
I'm using Hadoop 1.0.4 and Spark 1.2.0.

I'm facing a strange issue. I have a requirement to read a small file from
HDFS and all it's content has to be read at one shot. So I'm using spark
context's wholeTextFiles API passing the HDFS URL for the file.

When I try this from a spark shell it's works as mentioned in the
documentation, but when I try the same through program (by submitting job
to cluster) I get FileNotFoundException. I have all compatible JARs in
place.

Please help.


Re: declare constant as date

2016-03-21 Thread Divya Gehlot
Oh my my I am so silly

I can declare it as string and cast it to date

My apologies for Spamming the mailing list.

Thanks,
Divya

On 21 March 2016 at 14:51, Divya Gehlot  wrote:

> Hi,
> In Spark 1.5.2
> Do we have any utiility which converts a constant value as shown below
> orcan we declare a date variable like val start_date :Date = "2015-03-02"
>
> val start_date = "2015-03-02" toDate
> like how we convert to toInt ,toString
> I searched for it but  couldnt find it
>
>
> Thanks,
> Divya
>


Re: Best way to store Avro Objects as Parquet using SPARK

2016-03-21 Thread Sebastian Piu
We use this, but not sure how the schema is stored

Job job = Job.getInstance();
ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
AvroParquetOutputFormat.setSchema(job, schema);
LazyOutputFormat.setOutputFormatClass(job, new
ParquetOutputFormat().getClass());
job.getConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs",
"false");
job.getConfiguration().set("parquet.enable.summary-metadata", "false");

//save the file
rdd.mapToPair(me -> new Tuple2(null, me))
.saveAsNewAPIHadoopFile(
String.format("%s/%s", path, timeStamp.milliseconds()),
Void.class,
clazz,
LazyOutputFormat.class,
job.getConfiguration());

On Mon, 21 Mar 2016, 05:55 Manivannan Selvadurai, 
wrote:

> Hi All,
>
>   In my current project there is a requirement to store avro data
> (json format) as parquet files.
> I was able to use AvroParquetWriter in separately to create the Parquet
> Files. The parquet files along with the data also had the 'avro schema'
> stored on them as a part of their footer.
>
>But when tired using Spark streamng I could not find a way to
> store the data with the avro schema information. The closest that I got was
> to create a Dataframe using the json RDDs and store them as parquet. Here
> the parquet files had a spark specific schema in their footer.
>
>   Is this the right approach or do I have a better one. Please guide
> me.
>
>
> We are using Spark 1.4.1.
>
> Thanks In Advance!!
>


Re: Setting up spark to run on two nodes

2016-03-21 Thread Akhil Das
You can simply execute the sbin/start-slaves.sh file to start up all slave
processes. Just make sure you have spark installed on the same path on all
the machines.

Thanks
Best Regards

On Sat, Mar 19, 2016 at 4:01 AM, Ashok Kumar 
wrote:

> Experts.
>
> Please your valued advice.
>
> I have spark 1.5.2 set up as standalone for now and I have started the
> master as below
>
> start-master.sh
>
> I also have modified config/slave file to have
>
> # A Spark Worker will be started on each of the machines listed below.
> localhost
> workerhost
>
>
> On the localhost I start slave as follows:
>
> start-slave.sh spark:localhost:7077
>
> Questions.
>
> If I want worker process to be started not only on localhost but also
> workerhost
>
> 1) Do I need just to do start-slave.sh on localhost and it will start the
> worker process on other node -> workerhost
> 2) Do I have to runt start-slave.sh spark:workerhost:7077 as well locally
> on workerhost
> 3) On GUI http:// 
> localhost:4040/environment/
> I do not see any reference to worker process running on workerhost
>
> Appreciate any help on how to go about starting the master on localhost
> and starting two workers one on localhost and the other on workerhost
>
> Thanking you
>
>


declare constant as date

2016-03-21 Thread Divya Gehlot
Hi,
In Spark 1.5.2
Do we have any utiility which converts a constant value as shown below
orcan we declare a date variable like val start_date :Date = "2015-03-02"

val start_date = "2015-03-02" toDate
like how we convert to toInt ,toString
I searched for it but  couldnt find it


Thanks,
Divya


Re: Potential conflict with org.iq80.snappy in Spark 1.6.0 environment?

2016-03-21 Thread Akhil Das
Looks like a jar conflict, could you paste the piece of code? and how your
dependency file looks like?

Thanks
Best Regards

On Sat, Mar 19, 2016 at 7:49 AM, vasu20  wrote:

> Hi,
>
> I have some code that parses a snappy thrift file for objects.  This code
> works fine when run standalone (outside of the Spark environment).
> However,
> when running from within Spark, I get an IllegalAccessError exception from
> the org.iq80.snappy package.  Has anyone else seen this error and/or do you
> have any suggestions?  Any pointers appreciated.  Thanks!
>
> Vasu
>
> --
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
> Lost task 0.0 in stage 0.0 (TID 0, localhost):
> java.lang.IllegalAccessError:
> tried to access class org.iq80.snappy.BufferRecycler from class
> org.iq80.snappy.AbstractSnappyInputStream
> at
>
> org.iq80.snappy.AbstractSnappyInputStream.(AbstractSnappyInputStream.java:91)
> at
>
> org.iq80.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:38)
> at DistMatchMetric$1.call(DistMatchMetric.java:131)
> at DistMatchMetric$1.call(DistMatchMetric.java:123)
> at
>
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
>
> scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
> at
> scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
> at
>
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:1011)
> at
>
> org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:1009)
> at
> org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
> at
> org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Potential-conflict-with-org-iq80-snappy-in-Spark-1-6-0-environment-tp26539.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


How to name features and perform custom cross validation in ML

2016-03-21 Thread iguana314
Hello,

I'm trying to a simple linear regression in Spark ML. Below is my Data Frame
along with some sample code and output done via Spyder on a local spark
cluster.

*##
#Begin Code
##*
regressionDF.show(5)
+---++
|  label|features|
+---++
|59222.0|[1.49297445325996...|
|68212.0|[1.49297445325996...|
|68880.0|[1.49297445325996...|
|69307.0|[1.49297445325996...|
|81900.0|[1.49297445325996...|
+---++
only showing top 5 rows

lr2 = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr2.fit(regressionDF)

# Print the coefficients and intercept for linear regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
Coefficients: [5942.44830928,-8073.06894164,81071.1787768]
Intercept: 48473.6291555

numFolds = 10
evaluator = RegressionEvaluator(predictionCol="prediction",
labelCol="label", metricName="rmse") 
pipeline = Pipeline(stages=[lrModel])

crossval = CrossValidator(
estimator=pipeline,
estimatorParamMaps=ParamGridBuilder().build(),
evaluator=evaluator,
numFolds=numFolds)

CVModel = crossval.fit(regressionDF)
bestModel = CVModel.bestModel
cvPrediction = CVModel.transform(regressionDF).select("label", "prediction")

cvPrediction.show(5)
+---+--+
|  label|prediction|
+---+--+
|59222.0|140493.58824997063|
|68212.0| 171442.7987818182|
|68880.0|135608.61939589953|
|69307.0|142447.57579159905|
|81900.0|135730.74361725134|
+---+--+
only showing top 5 rows

*##
#End Code
##*

So the code seems to work but I'm unsure as to how to do the following
things:

*1)* I want to name every "column" of my feature vector. Then when it shows
me the coefficients, I can see what each column refers to?

*2)* How can I see the best model selected from the CV model? I realize this
might not be applicable for something like random forests, but for linear
regression this might be quite useful?

*3)*  Is there a way to specify the range it considers for the coefficients
for each feature (for instance, I want feature 2 to have a coefficient no
larger than some number Beta_2)? I realize that using Lasso we can constrain
all the features using the L1 norm in Lasso, but what about a specific
feature?

*4) * what's the easiest way to get something like R^2 or adjusted R^2? I
can code it manually but is any of it built in?

Thank you for your help!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-name-features-and-perform-custom-cross-validation-in-ML-tp26545.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building spark submodule source code

2016-03-21 Thread Akhil Das
Have a look at the intellij setup
https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IntelliJ
Once you have the setup ready, you don't have to recompile the whole stuff
every time.

Thanks
Best Regards

On Mon, Mar 21, 2016 at 8:14 AM, Tenghuan He  wrote:

> Hi everyone,
>
> I am trying to add a new method to spark RDD. After changing the code
> of RDD.scala and running the following command
> mvn -pl :spark-core_2.10 -DskipTests clean install
> It BUILD SUCCESS, however, when starting the bin\spark-shell, my
> method cannot be found.
> Do I have to rebuild the whole spark project instead the spark-core
> submodule to make the changes work?
> Rebuiling the whole project is too time consuming, is there any better
> choice?
>
>
> Thanks & Best Regards
>
> Tenghuan He
>
>


Re: Error using collectAsMap() in scala

2016-03-21 Thread Akhil Das
What you should be doing is a join, something like this:

//Create a key, value pair, key being the column1
val rdd1 = sc.textFile(file1).map(x => (x.split(",")(0),x.split(","))

//Create a key, value pair, key being the column2
val rdd2 = sc.textFile(file2).map(x => (x.split(",")(1),x.split(","))

//Now join the dataset
val joined = rdd1.join(rdd2)

//Now do the replacement
val replaced = joined.map(...)





Thanks
Best Regards

On Mon, Mar 21, 2016 at 10:31 AM, Shishir Anshuman <
shishiranshu...@gmail.com> wrote:

> I have stored the contents of two csv files in separate RDDs.
>
> file1.csv format*: (column1,column2,column3)*
> file2.csv format*: (column1, column2)*
>
> *column1 of file1 *and* column2 of file2 *contains similar data. I want
> to compare the two columns and if match is found:
>
>- Replace the data at *column1(file1)* with the* column1(file2)*
>
>
> For this reason, I am not using normal RDD.
>
> I am still new to apache spark, so any suggestion will be greatly
> appreciated.
>
> On Mon, Mar 21, 2016 at 10:09 AM, Prem Sure  wrote:
>
>> any specific reason you would like to use collectasmap only? You probably
>> move to normal RDD instead of a Pair.
>>
>>
>> On Monday, March 21, 2016, Mark Hamstra  wrote:
>>
>>> You're not getting what Ted is telling you.  Your `dict` is an
>>> RDD[String]  -- i.e. it is a collection of a single value type, String.
>>> But `collectAsMap` is only defined for PairRDDs that have key-value pairs
>>> for their data elements.  Both a key and a value are needed to collect into
>>> a Map[K, V].
>>>
>>> On Sun, Mar 20, 2016 at 8:19 PM, Shishir Anshuman <
>>> shishiranshu...@gmail.com> wrote:
>>>
 yes I have included that class in my code.
 I guess its something to do with the RDD format. Not able to figure out
 the exact reason.

 On Fri, Mar 18, 2016 at 9:27 AM, Ted Yu  wrote:

> It is defined in:
> core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
>
> On Thu, Mar 17, 2016 at 8:55 PM, Shishir Anshuman <
> shishiranshu...@gmail.com> wrote:
>
>> I am using following code snippet in scala:
>>
>>
>> *val dict: RDD[String] = sc.textFile("path/to/csv/file")*
>> *val dict_broadcast=sc.broadcast(dict.collectAsMap())*
>>
>> On compiling It generates this error:
>>
>> *scala:42: value collectAsMap is not a member of
>> org.apache.spark.rdd.RDD[String]*
>>
>>
>> *val dict_broadcast=sc.broadcast(dict.collectAsMap())
>> ^*
>>
>
>

>>>
>


Run External R script from Spark

2016-03-21 Thread sujeet jog
Hi,

I have been working on a POC on some time series related stuff, i'm using
python since i need spark streaming and sparkR is yet to have a spark
streaming front end,  couple of algorithms i want to use are not yet
present in Spark-TS package, so I'm thinking of invoking a external R
script for the Algorithm part & pass the data from Spark to the R script
via pipeRdd,


What i wanted to understand is can something like this be used in a
production deployment,  since passing the data via R script would mean lot
of serializing and would actually not use the power of spark for parallel
execution,

Has anyone used this kind of workaround  Spark -> pipeRdd-> R script.


Thanks,
Sujeet