Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Sean Owen
Hm, now I wonder if it's the same issue here:
https://issues.apache.org/jira/browse/SPARK-10149

Does the setting described there help?

On Mon, Oct 26, 2015 at 11:39 AM, Jinfeng Li  wrote:

> Hi, I have already tried the same code with Spark 1.3.1, there is no such
> problem. The configuration files are all directly copied from Spark 1.5.1.
> I feel it is a bug on Spark 1.5.1.
>
> Thanks a lot for your response.
>
> On Mon, Oct 26, 2015 at 7:21 PM Sean Owen  wrote:
>
>> Yeah, are these stats actually reflecting data read locally, like through
>> the loopback interface? I'm also no expert on the internals here but this
>> may be measuring effectively local reads. Or are you sure it's not?
>>
>> On Mon, Oct 26, 2015 at 11:14 AM, Steve Loughran 
>> wrote:
>>
>>>
>>> > On 26 Oct 2015, at 09:28, Jinfeng Li  wrote:
>>> >
>>> > Replication factor is 3 and we have 18 data nodes. We check HDFS
>>> webUI, data is evenly distributed among 18 machines.
>>> >
>>>
>>>
>>> every block in HDFS (usually 64-128-256 MB) is distributed across three
>>> machines, meaning 3 machines have it local, 15 have it remote.
>>>
>>> for data locality to work properly, you need the executors to be reading
>>> in the blocks of data local to them, and not data from other parts of the
>>> files. Spark does try to do locality, but if there's only a limited set of
>>> executors, then more of the workload is remote vs local.
>>>
>>> I don't know of an obvious way to get the metrics here of local vs
>>> remote; I don't see the HDFS client library tracking that —though it should
>>> be the place to collect stats on local/remote/domain-socket-direct IO. Does
>>> anyone know somewhere in the Spark metrics which tracks placement locality?
>>> If not, both layers could have some more metrics added.
>>
>>
>>


Re: [Spark Streaming] How do we reset the updateStateByKey values.

2015-10-26 Thread Adrian Tanase
Have you considered union-ing the 2 streams? Basically you can consider them as 
2 “message types” that your update function can consume (e.g. implement a 
common interface):

  *   regularUpdate
  *   resetStateUpdate

Inside your updateStateByKey you can check if any of the messages in the list 
of updates is an resetState message. If now, continue summing the others.

I can provide scala samples, my java is beyond rusty :)

-adrian

From: Uthayan Suthakar
Date: Friday, October 23, 2015 at 2:10 PM
To: Sander van Dijk
Cc: user
Subject: Re: [Spark Streaming] How do we reset the updateStateByKey values.

Hi Sander,

Thank you for your very informative email. From your email, I've learned a 
quite a bit.

>>>Is the condition determined somehow from the data coming through streamLogs, 
>>>and is newData streamLogs again (rather than a whole data source?)

No, they are two different Streams. I have two stream receivers, one of which 
sends event regularly and the other is not so regular (this data is computed by 
another application and stored into HDFS). What I'm trying to do is pick up the 
data from HDFS and overwrite the Stream's state. Hence the overwriting should 
only take place if there were new files in HDFS.

So we have two different RDDs. If no file is found in HDFS, it will simply read 
the regular stream, compute and update the state(1) and output the result. If 
there is a file found in HDFS, then it should overwrite the state (1) with the 
data found from HDFS so the new events from the regular stream will carry on 
with the new overwritten state.

I managed to get most of it done, but only having the issue with overwriting 
the state.



On 22 October 2015 at 19:35, Sander van Dijk 
> wrote:
I don't think it is possible in the way you try to do it. It is important to 
remember that the statements you mention only set up the stream stages, before 
the stream is actually running. Once it's running, you cannot change, remove or 
add stages.

I am not sure how you determine your condition and what the actual change 
should be when that condition is met: you say you want a different update 
function but then give a statement with the same update function but a 
different source stream). Is the condition determined somehow from the data 
coming through streamLogs, and is newData basically streamLogs again (rather 
than a whole data source?). In that case I can think of 3 things to try:

- if the condition you switch on can be determined independently from every 
item in streamLogs, you can simply do an if/else inside updateResultsStream to 
change the method that you determine your state
- if this is not the case, but you can determine when to switch your condition 
for each key independently, you can extend your state type to also keep track 
of your condition: rather than using JavaPairDStream you make 
updatedResultsState a JavaPairDStream> (assuming 
you have some class Pair), and you make updateResultsStream update and check 
the state of the boolean.
- finally, you can have a separate state stream that keeps track of your 
condition globally, then join that with you main stream and use that to update 
state. Something like:

// determineCondition should result in a reduction to a single item that 
signals whether the condition is met in the current batch, updateContitionState 
should remember that
conditionStateStream = 
streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)

// addCondition gets RDDs from streamLogs and  single-item RDDs with the 
condition state and should add that state to each item in the streamLogs RDD
joinedStream = streamLogs.transformWith(conditionStateStream, addCondition)

// This is similar to the extend state type of the previous idea, but now your 
condition state is determined globally rather than per log entry
updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)

I hope this applies to your case and that it makes sense, my Java is a bit 
rusty :) and perhaps others can suggest better spark streaming methods that can 
be used, but hopefully the idea is clear.

Sander

On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar 
> wrote:
Hello guys,

I have a stream job that will carryout computations and update the state (SUM 
the value). At some point, I would like to reset the state. I could drop the 
state by setting 'None' but I don't want to drop it. I would like to keep the 
state but update the state.


For example:

JavaPairDStream updatedResultsState = 
streamLogs.updateStateByKey(updateResultsStream);

At some condition, I would like to update the state by key but with the 
different values, hence different update function.


e.g.

 updatedResultsState = newData.updateStateByKey(resetResultsStream);

But the  newData.updateStateByKeyvalues cannot be 

Re: [SPARK MLLIB] could not understand the wrong and inscrutable result of Linear Regression codes

2015-10-26 Thread Zhiliang Zhu
Hi Meihua, DB  Tsai,
Thanks very much for your all kind help.While I add some more LabeledPoint  in 
the training data, then the output result also seems much better. I will also 
try setFitIntercept(false) way . 

Currently I encounted some problem about algorithm optimization issue: f(x1, 
x2, ..., xn) = a11 *x1 * x1 + a12 * x1 * x2 + a22 * x2 * x2 + ... + ann * xn * 
xn ,  with constraint equations:b1 * x1 + b2 * x2 + ... bn * xn = 1, xi >= 0 
etc .To find the proper x = [x1, x2, ..., xn] to make f(x1, x2, , xn) the 
biggest .

It is reqiured to use Spark to fix it, however, I am not familar to use spark 
directly on algorithm optimization issue, and now I am not skilled to use  
gradient descentway on the multiple dimension function.If you know this issue, 
would you help comment some.
Yes, then I converted this problem into someone about solve systems of linear 
equations c1 * w1 + c2 * w2 + ... + cn * wn = d,I just view c and w convensely 
as, w1 * c1 + w2 * c2 + ... + wn * cn = d, then w becomes coefficient and c 
becomes variable, 
I think Spark Linear Regression would be helpful here.
Expert Sujit also kindly help me to point out the way to figure out pseudo 
inverse A for Ax = b, I will also try it next.
Since I would use Spark to fix the issue, as you said breeze shall be used 
here, would you help explain or direct some about the way to use it here...

Thank you very much !Zhiliang



 On Monday, October 26, 2015 2:58 PM, Meihua Wu 
 wrote:
   

 please add "setFitIntercept(false)" to your LinearRegression.

LinearRegression by default includes an intercept in the model, e.g.
label = intercept + features dot weight

To get the result you want, you need to force the intercept to be zero.

Just curious, are you trying to solve systems of linear equations? If
so, you can probably try breeze.



On Sun, Oct 25, 2015 at 9:10 PM, Zhiliang Zhu
 wrote:
>
>
>
> On Monday, October 26, 2015 11:26 AM, Zhiliang Zhu
>  wrote:
>
>
> Hi DB Tsai,
>
> Thanks very much for your kind help. I  get it now.
>
> I am sorry that there is another issue, the weight/coefficient result is
> perfect while A is triangular matrix, however, while A is not triangular
> matrix (but
> transformed from triangular matrix, still is invertible), the result seems
> not perfect and difficult to make it better by resetting the parameter.
> Would you help comment some about that...
>
> List localTraining = Lists.newArrayList(
>      new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)),
>      new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)),
>      new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)),
>      new LabeledPoint(-3.0, Vectors.dense(0.0, 0.0, -1.0, 0.0)));
> ...
> LinearRegression lr = new LinearRegression()
>      .setMaxIter(2)
>      .setRegParam(0)
>      .setElasticNetParam(0);
> 
>
> --
>
> It seems that no matter how to reset the parameters for lr , the output of
> x3 and x4 is always nearly the same .
> Whether there is some way to make the result a little better...
>
>
> --
>
> x3 and x4 could not become better, the output is:
> Final w:
> [0.999477672867,1.999748740578,3.500112393734,3.50011239377]
>
> Thank you,
> Zhiliang
>
>
>
> On Monday, October 26, 2015 10:25 AM, DB Tsai  wrote:
>
>
> Column 4 is always constant, so no predictive power resulting zero weight.
>
> On Sunday, October 25, 2015, Zhiliang Zhu  wrote:
>
> Hi DB Tsai,
>
> Thanks very much for your kind reply help.
>
> As for your comment, I just modified and tested the key part of the codes:
>
>  LinearRegression lr = new LinearRegression()
>        .setMaxIter(1)
>        .setRegParam(0)
>        .setElasticNetParam(0);  //the number could be reset
>
>  final LinearRegressionModel model = lr.fit(training);
>
> Now the output is much reasonable, however, x4 is always 0 while repeatedly
> reset those parameters in lr , would you help some about it how to properly
> set the parameters ...
>
> Final w: [1.00127825909,1.99979185054,2.3307136,0.0]
>
> Thank you,
> Zhiliang
>
>
>
>
> On Monday, October 26, 2015 5:14 AM, DB Tsai  wrote:
>
>
> LinearRegressionWithSGD is not stable. Please use linear regression in
> ML package instead.
> http://spark.apache.org/docs/latest/ml-linear-methods.html
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP 

Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Sean Owen
-dev +user
How are you measuring network traffic?
It's not in general true that there will be zero network traffic, since not
all executors are local to all data. That can be the situation in many
cases but not always.

On Mon, Oct 26, 2015 at 8:57 AM, Jinfeng Li  wrote:

> Hi, I find that loading files from HDFS can incur huge amount of network
> traffic. Input size is 90G and network traffic is about 80G. By my
> understanding, local files should be read and thus no network communication
> is needed.
>
> I use Spark 1.5.1, and the following is my code:
>
> val textRDD = sc.textFile("hdfs://master:9000/inputDir")
> textRDD.count
>
> Jeffrey
>


Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Sean Owen
Hm, how about the opposite question -- do you have just 1 executor? then
again everything will be remote except for a small fraction of blocks.

On Mon, Oct 26, 2015 at 9:28 AM, Jinfeng Li  wrote:

> Replication factor is 3 and we have 18 data nodes. We check HDFS webUI,
> data is evenly distributed among 18 machines.
>
>
> On Mon, Oct 26, 2015 at 5:18 PM Sean Owen  wrote:
>
>> Have a look at your HDFS replication, and where the blocks are for these
>> files. For example, if you had only 2 HDFS data nodes, then data would be
>> remote to 16 of 18 workers and always entail a copy.
>>
>> On Mon, Oct 26, 2015 at 9:12 AM, Jinfeng Li  wrote:
>>
>>> I cat /proc/net/dev and then take the difference of received bytes
>>> before and after the job. I also see a long-time peak (nearly 600Mb/s) in
>>> nload interface.  We have 18 machines and each machine receives 4.7G bytes.
>>>
>>> On Mon, Oct 26, 2015 at 5:00 PM Sean Owen  wrote:
>>>
 -dev +user
 How are you measuring network traffic?
 It's not in general true that there will be zero network traffic, since
 not all executors are local to all data. That can be the situation in many
 cases but not always.

 On Mon, Oct 26, 2015 at 8:57 AM, Jinfeng Li  wrote:

> Hi, I find that loading files from HDFS can incur huge amount of
> network traffic. Input size is 90G and network traffic is about 80G. By my
> understanding, local files should be read and thus no network 
> communication
> is needed.
>
> I use Spark 1.5.1, and the following is my code:
>
> val textRDD = sc.textFile("hdfs://master:9000/inputDir")
> textRDD.count
>
> Jeffrey
>


>>


Re: Secondary Sorting in Spark

2015-10-26 Thread Adrian Tanase
Do you have a particular concern? You’re always using a partitioner (default is 
HashPartitioner) and the Partitioner interface is pretty light, can’t see how 
it could affect performance.

Used correctly it should improve performance as you can better control 
placement of data and avoid shuffling…

-adrian

From: swetha kasireddy
Date: Monday, October 26, 2015 at 6:56 AM
To: Adrian Tanase
Cc: Bill Bejeck, "user@spark.apache.org"
Subject: Re: Secondary Sorting in Spark

Hi,

Does the use of custom partitioner in Streaming affect performance?

On Mon, Oct 5, 2015 at 1:06 PM, Adrian Tanase 
> wrote:
Great article, especially the use of a custom partitioner.

Also, sorting by multiple fields by creating a tuple out of them is an awesome, 
easy to miss, Scala feature.

Sent from my iPhone

On 04 Oct 2015, at 21:41, Bill Bejeck 
> wrote:

I've written blog post on secondary sorting in Spark and I'd thought I'd share 
it with the group

http://codingjunkie.net/spark-secondary-sort/

Thanks,
Bill



Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-26 Thread Adrian Tanase
If I understand the order correctly, not really. First of all, the easiest way 
to make sure it works as expected is to check out the visual DAG in the spark 
UI.

It should map 1:1 to your code, and since I don’t see any shuffles in the 
operations below it should execute all in one stage, forking after X.
That means that all the executor cores will each process a partition completely 
in isolation, most likely 3 tasks (A, B, X2). Most likely in the order you 
define in code although depending on the data some tasks may get skipped or 
moved around.

I’m curious – why do you ask? Do you have a particular concern or use case that 
relies on ordering between A, B and X2?

-adrian

From: Nipun Arora
Date: Sunday, October 25, 2015 at 4:09 PM
To: Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

So essentially the driver/client program needs to explicitly have two threads 
to ensure concurrency?

What happens when the program is sequential... I.e. I execute function A and 
then function B. Does this mean that each RDD first goes through function A, 
and them stream X is persisted, but processed in function B only after the RDD 
has been processed by A?

Thanks
Nipun
On Sat, Oct 24, 2015 at 5:32 PM Andy Dang 
> wrote:
If you execute the collect step (foreach in 1, possibly reduce in 2) in two 
threads in the driver then both of them will be executed in parallel. Whichever 
gets submitted to Spark first gets executed first - you can use a semaphore if 
you need to ensure the ordering of execution, though I would assume that the 
ordering wouldn't matter.

---
Regards,
Andy

On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
> wrote:
I wanted to understand something about the internals of spark streaming 
executions.

If I have a stream X, and in my program I send stream X to function A and 
function B:

1. In function A, I do a few transform/filter operations etc. on X->Y->Z to 
create stream Z. Now I do a forEach Operation on Z and print the output to a 
file.

2. Then in function B, I reduce stream X -> X2 (say min value of each RDD), and 
print the output to file

Are both functions being executed for each RDD in parallel? How does it work?

Thanks
Nipun




Re: Accumulators internals and reliability

2015-10-26 Thread Adrian Tanase
I can reply from an user’s perspective – I defer to semantic guarantees to 
someone with more experience.

I’ve successfully implemented the following using a custom Accumulable class:

  *   Created a MapAccumulator with dynamic keys (they are driven by the data 
coming in), as opposed to creating many discrete accumulators
 *   The merge operation is add the values on key conflict
  *   I’m adding K->Vs to this accumulator in a variety of places (maps, 
flatmaps, transforms and updateStateBy key)
  *   In a foreachRdd at the end of the transformations I’m reading the 
accumulator and writing the counters to OpenTSDB
 *   after this I’m resetting it to the “zero” value (e.g. Empty map)

Everything works as expected in terms of functionality - with 2 caveats:

  *   On task/job failure you might get duplicate values for the tasks that are 
retried in the active job since adding to an Accumulator in a transformation is 
a side effect
 *   I’m partially working around this by also referring to the RDD time 
and overwriting the values in OpenTSDB (idempotent operation)
  *   If you have stateful transformations and you use checkpointing, the 
accumulator code becomes really intrusive in your codebase
 *   You will need to have a global singleton in your driver and 
“getInstance” in a foreachRdd or transform, to force code execution on the 
driver
 *   This is because on restoring from checkpoint your accumulators will be 
NULL as the checkpoint recovery makes no attempt to initialize them (See 
SPARK-5206)

Hope this helps,
-adrian

From: "Sela, Amit"
Date: Monday, October 26, 2015 at 11:13 AM
To: "user@spark.apache.org"
Subject: Accumulators internals and reliability

It seems like there is not much literature about Spark's Accumulators so I 
thought I'd ask here:

Do Accumulators reside in a Task ? Are they being serialized with the task ? 
Sent back on task completion as part of the ResultTask ?

Are they reliable ? If so, when ? Can I relay on accumulators value only after 
the task was successfully complete (meaning in the driver) ? Or also during the 
task execution as well (what about speculative execution) ?

What are the limitations on the number (or size) of Accumulators ?

Thanks,
Amit


Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-26 Thread Adrian Tanase
Thinking more about it – it should only be 2 tasks as A and B are most likely 
collapsed by spark in a single task.

Again – learn to use the spark UI as it’s really informative. The combination 
of DAG visualization and task count should answer most of your questions.

-adrian

From: Adrian Tanase
Date: Monday, October 26, 2015 at 11:57 AM
To: Nipun Arora, Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

If I understand the order correctly, not really. First of all, the easiest way 
to make sure it works as expected is to check out the visual DAG in the spark 
UI.

It should map 1:1 to your code, and since I don’t see any shuffles in the 
operations below it should execute all in one stage, forking after X.
That means that all the executor cores will each process a partition completely 
in isolation, most likely 3 tasks (A, B, X2). Most likely in the order you 
define in code although depending on the data some tasks may get skipped or 
moved around.

I’m curious – why do you ask? Do you have a particular concern or use case that 
relies on ordering between A, B and X2?

-adrian

From: Nipun Arora
Date: Sunday, October 25, 2015 at 4:09 PM
To: Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

So essentially the driver/client program needs to explicitly have two threads 
to ensure concurrency?

What happens when the program is sequential... I.e. I execute function A and 
then function B. Does this mean that each RDD first goes through function A, 
and them stream X is persisted, but processed in function B only after the RDD 
has been processed by A?

Thanks
Nipun
On Sat, Oct 24, 2015 at 5:32 PM Andy Dang 
> wrote:
If you execute the collect step (foreach in 1, possibly reduce in 2) in two 
threads in the driver then both of them will be executed in parallel. Whichever 
gets submitted to Spark first gets executed first - you can use a semaphore if 
you need to ensure the ordering of execution, though I would assume that the 
ordering wouldn't matter.

---
Regards,
Andy

On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
> wrote:
I wanted to understand something about the internals of spark streaming 
executions.

If I have a stream X, and in my program I send stream X to function A and 
function B:

1. In function A, I do a few transform/filter operations etc. on X->Y->Z to 
create stream Z. Now I do a forEach Operation on Z and print the output to a 
file.

2. Then in function B, I reduce stream X -> X2 (say min value of each RDD), and 
print the output to file

Are both functions being executed for each RDD in parallel? How does it work?

Thanks
Nipun




Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Jinfeng Li
I use standalone mode. Each machine has 4 workers. Spark is deployed
correctly as webUI and jps command can show that.

Actually, we are a team and already use spark for nearly half a year,
started from Spark 1.3.1. We find this problem on one of our application
and I write a simple program to demonstrate that. So can spark team try
this for us?  If you doesn't have this problem, we will debug by ourselves.

Jeffrey

On Mon, Oct 26, 2015 at 5:32 PM Sean Owen  wrote:

> Hm, how about the opposite question -- do you have just 1 executor? then
> again everything will be remote except for a small fraction of blocks.
>
> On Mon, Oct 26, 2015 at 9:28 AM, Jinfeng Li  wrote:
>
>> Replication factor is 3 and we have 18 data nodes. We check HDFS webUI,
>> data is evenly distributed among 18 machines.
>>
>>
>> On Mon, Oct 26, 2015 at 5:18 PM Sean Owen  wrote:
>>
>>> Have a look at your HDFS replication, and where the blocks are for these
>>> files. For example, if you had only 2 HDFS data nodes, then data would be
>>> remote to 16 of 18 workers and always entail a copy.
>>>
>>> On Mon, Oct 26, 2015 at 9:12 AM, Jinfeng Li  wrote:
>>>
 I cat /proc/net/dev and then take the difference of received bytes
 before and after the job. I also see a long-time peak (nearly 600Mb/s) in
 nload interface.  We have 18 machines and each machine receives 4.7G bytes.

 On Mon, Oct 26, 2015 at 5:00 PM Sean Owen  wrote:

> -dev +user
> How are you measuring network traffic?
> It's not in general true that there will be zero network traffic,
> since not all executors are local to all data. That can be the situation 
> in
> many cases but not always.
>
> On Mon, Oct 26, 2015 at 8:57 AM, Jinfeng Li  wrote:
>
>> Hi, I find that loading files from HDFS can incur huge amount of
>> network traffic. Input size is 90G and network traffic is about 80G. By 
>> my
>> understanding, local files should be read and thus no network 
>> communication
>> is needed.
>>
>> I use Spark 1.5.1, and the following is my code:
>>
>> val textRDD = sc.textFile("hdfs://master:9000/inputDir")
>> textRDD.count
>>
>> Jeffrey
>>
>
>
>>>
>


Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Sean Owen
Yeah, are these stats actually reflecting data read locally, like through
the loopback interface? I'm also no expert on the internals here but this
may be measuring effectively local reads. Or are you sure it's not?

On Mon, Oct 26, 2015 at 11:14 AM, Steve Loughran 
wrote:

>
> > On 26 Oct 2015, at 09:28, Jinfeng Li  wrote:
> >
> > Replication factor is 3 and we have 18 data nodes. We check HDFS webUI,
> data is evenly distributed among 18 machines.
> >
>
>
> every block in HDFS (usually 64-128-256 MB) is distributed across three
> machines, meaning 3 machines have it local, 15 have it remote.
>
> for data locality to work properly, you need the executors to be reading
> in the blocks of data local to them, and not data from other parts of the
> files. Spark does try to do locality, but if there's only a limited set of
> executors, then more of the workload is remote vs local.
>
> I don't know of an obvious way to get the metrics here of local vs remote;
> I don't see the HDFS client library tracking that —though it should be the
> place to collect stats on local/remote/domain-socket-direct IO. Does anyone
> know somewhere in the Spark metrics which tracks placement locality? If
> not, both layers could have some more metrics added.


Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Steve Loughran

> On 26 Oct 2015, at 09:28, Jinfeng Li  wrote:
> 
> Replication factor is 3 and we have 18 data nodes. We check HDFS webUI, data 
> is evenly distributed among 18 machines. 
> 


every block in HDFS (usually 64-128-256 MB) is distributed across three 
machines, meaning 3 machines have it local, 15 have it remote.

for data locality to work properly, you need the executors to be reading in the 
blocks of data local to them, and not data from other parts of the files. Spark 
does try to do locality, but if there's only a limited set of executors, then 
more of the workload is remote vs local.

I don't know of an obvious way to get the metrics here of local vs remote; I 
don't see the HDFS client library tracking that —though it should be the place 
to collect stats on local/remote/domain-socket-direct IO. Does anyone know 
somewhere in the Spark metrics which tracks placement locality? If not, both 
layers could have some more metrics added.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Jinfeng Li
Replication factor is 3 and we have 18 data nodes. We check HDFS webUI,
data is evenly distributed among 18 machines.


On Mon, Oct 26, 2015 at 5:18 PM Sean Owen  wrote:

> Have a look at your HDFS replication, and where the blocks are for these
> files. For example, if you had only 2 HDFS data nodes, then data would be
> remote to 16 of 18 workers and always entail a copy.
>
> On Mon, Oct 26, 2015 at 9:12 AM, Jinfeng Li  wrote:
>
>> I cat /proc/net/dev and then take the difference of received bytes before
>> and after the job. I also see a long-time peak (nearly 600Mb/s) in nload
>> interface.  We have 18 machines and each machine receives 4.7G bytes.
>>
>> On Mon, Oct 26, 2015 at 5:00 PM Sean Owen  wrote:
>>
>>> -dev +user
>>> How are you measuring network traffic?
>>> It's not in general true that there will be zero network traffic, since
>>> not all executors are local to all data. That can be the situation in many
>>> cases but not always.
>>>
>>> On Mon, Oct 26, 2015 at 8:57 AM, Jinfeng Li  wrote:
>>>
 Hi, I find that loading files from HDFS can incur huge amount of
 network traffic. Input size is 90G and network traffic is about 80G. By my
 understanding, local files should be read and thus no network communication
 is needed.

 I use Spark 1.5.1, and the following is my code:

 val textRDD = sc.textFile("hdfs://master:9000/inputDir")
 textRDD.count

 Jeffrey

>>>
>>>
>


Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Jinfeng Li
Hi, I have already tried the same code with Spark 1.3.1, there is no such
problem. The configuration files are all directly copied from Spark 1.5.1.
I feel it is a bug on Spark 1.5.1.

Thanks a lot for your response.

On Mon, Oct 26, 2015 at 7:21 PM Sean Owen  wrote:

> Yeah, are these stats actually reflecting data read locally, like through
> the loopback interface? I'm also no expert on the internals here but this
> may be measuring effectively local reads. Or are you sure it's not?
>
> On Mon, Oct 26, 2015 at 11:14 AM, Steve Loughran 
> wrote:
>
>>
>> > On 26 Oct 2015, at 09:28, Jinfeng Li  wrote:
>> >
>> > Replication factor is 3 and we have 18 data nodes. We check HDFS webUI,
>> data is evenly distributed among 18 machines.
>> >
>>
>>
>> every block in HDFS (usually 64-128-256 MB) is distributed across three
>> machines, meaning 3 machines have it local, 15 have it remote.
>>
>> for data locality to work properly, you need the executors to be reading
>> in the blocks of data local to them, and not data from other parts of the
>> files. Spark does try to do locality, but if there's only a limited set of
>> executors, then more of the workload is remote vs local.
>>
>> I don't know of an obvious way to get the metrics here of local vs
>> remote; I don't see the HDFS client library tracking that —though it should
>> be the place to collect stats on local/remote/domain-socket-direct IO. Does
>> anyone know somewhere in the Spark metrics which tracks placement locality?
>> If not, both layers could have some more metrics added.
>
>
>


Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-26 Thread Fengdong Yu
How many partitions you generated?
if Millions generated, then there is a huge memory consumed.





> On Oct 26, 2015, at 10:58 AM, Jerry Lam  wrote:
> 
> Hi guys,
> 
> I mentioned that the partitions are generated so I tried to read the 
> partition data from it. The driver is OOM after few minutes. The stack trace 
> is below. It looks very similar to the the jstack above (note on the refresh 
> method). Thanks!
> 
> Name: java.lang.OutOfMemoryError
> Message: GC overhead limit exceeded
> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> java.lang.StringBuilder.append(StringBuilder.java:132)
> org.apache.hadoop.fs.Path.toString(Path.java:384)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)
> org.apache.spark.sql.sources.HadoopFsRelation.org 
> $apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)
> org.apache.spark.sql.sources.HadoopFsRelation.org 
> $apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org 
> $apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org 
> $apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:31)
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
> 
> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam  > wrote:
> Hi Josh,
> 
> No I don't have speculation enabled. The driver took about few hours until it 
> was OOM. Interestingly, all partitions are generated successfully (_SUCCESS 
> file is written in the output directory). Is there a reason why the driver 
> needs so much memory? The jstack revealed that it called refresh some file 
> statuses. Is there a way to avoid OutputCommitCoordinator to use so much 
> memory? 
> 
> Ultimately, I choose to use partitions because most of the queries I have 
> will execute based the partition field. For example, "SELECT events from 
> customer where customer_id = 1234". If the partition is based on customer_id, 
> all events for a customer can be easily retrieved without filtering the 
> entire dataset which is much more efficient (I hope). However, I notice that 
> the implementation of the partition logic does not seem to allow this type of 
> use cases without using a lot of memory which is a bit odd in my opinion. Any 
> help will be greatly appreciated.
> 
> Best Regards,
> 
> Jerry
> 
> 
> 
> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen  > 

Re: [Yarn-Client]Can not access SparkUI

2015-10-26 Thread Deng Ching-Mallete
Hi Earthson,

Unfortunately, attachments aren't allowed in the list so they seemed to
have been removed from your email. Anyway, what happens when you click the
ApplicationMaster link?

Thanks,
Deng

On Mon, Oct 26, 2015 at 2:21 PM, Earthson  wrote:

> We are using Spark 1.5.1 with `--master yarn`, Yarn RM is running in HA
> mode.
>
> direct visit
>
>
>
>
> click ApplicationMaster link
>
>
>
>
> YARN RM log
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Client-Can-not-access-SparkUI-tp25197.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [Yarn-Client]Can not access SparkUI

2015-10-26 Thread Earthson Lu
it blocks until 500 Error:

HTTP ERROR 500

Problem accessing /proxy/application_1443146594954_0057/. Reason:

 (TimeOut, I guess)
Caused by:

java.net.ConnectException: 
at java.net.PlainSocketImpl.socketConnect(Native Method)
at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at java.net.Socket.(Socket.java:425)
at java.net.Socket.(Socket.java:280)

Yarn ResourceManager log

2015-10-26 11:30:13,328 INFO 
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=hdfs 
IP=123.103.21.82OPERATION=Submit Application Request
TARGET=ClientRMService  RESULT=SUCCESS  APPID=application_1443146594954_0057
2015-10-26 11:30:13,328 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: Storing 
application with id application_1443146594954_0057
2015-10-26 11:30:13,328 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: 
application_1443146594954_0057 State change from NEW to NEW_SAVING
2015-10-26 11:30:13,328 INFO 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore: Storing 
info for app: application_1443146594954_0057
2015-10-26 11:30:13,355 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: 
application_1443146594954_0057 State change from NEW_SAVING to SUBMITTED
2015-10-26 11:30:13,356 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler: 
Accepted application application_1443146594954_0057 from user: hdfs, in queue: 
default, currently num of applications: 3
2015-10-26 11:30:13,356 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: 
application_1443146594954_0057 State change from SUBMITTED to ACCEPTED
2015-10-26 11:30:13,356 INFO 
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: 
Registering app attempt : appattempt_1443146594954_0057_01
2015-10-26 11:30:13,356 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: 
appattempt_1443146594954_0057_01 State change from NEW to SUBMITTED
2015-10-26 11:30:13,356 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler: 
Added Application Attempt appattempt_1443146594954_0057_01 to scheduler 
from user: hdfs
2015-10-26 11:30:13,357 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: 
appattempt_1443146594954_0057_01 State change from SUBMITTED to SCHEDULED
2015-10-26 11:30:13,459 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: 
container_1443146594954_0057_01_01 Container Transitioned from NEW to 
ALLOCATED
2015-10-26 11:30:13,459 INFO 
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=hdfs 
OPERATION=AM Allocated ContainerTARGET=SchedulerApp RESULT=SUCCESS  
APPID=application_1443146594954_0057
CONTAINERID=container_1443146594954_0057_01_01
2015-10-26 11:30:13,459 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned 
container container_1443146594954_0057_01_01 of capacity  on host d190.mzhen.cn:52604, which has 6 containers,  used and  available after allocation
2015-10-26 11:30:13,460 INFO 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM:
 Sending NMToken for nodeId : d190.mzhen.cn:52604 for container : 
container_1443146594954_0057_01_01
2015-10-26 11:30:13,461 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: 
container_1443146594954_0057_01_01 Container Transitioned from ALLOCATED to 
ACQUIRED
2015-10-26 11:30:13,461 INFO 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM:
 Clear node set for appattempt_1443146594954_0057_01
2015-10-26 11:30:13,461 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: 
Storing attempt: AppId: application_1443146594954_0057 AttemptId: 
appattempt_1443146594954_0057_01 MasterContainer: Container: [ContainerId: 
container_1443146594954_0057_01_01, NodeId: d190.mzhen.cn:52604, 
NodeHttpAddress: d190.mzhen.cn:8042, Resource: , 
Priority: 0, Token: Token { kind: ContainerToken, service: 123.103.21.88:52604 
}, ]
2015-10-26 11:30:13,461 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: 
appattempt_1443146594954_0057_01 State change from SCHEDULED to 
ALLOCATED_SAVING
2015-10-26 11:30:13,505 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: 

Accumulators internals and reliability

2015-10-26 Thread Sela, Amit
It seems like there is not much literature about Spark's Accumulators so I 
thought I'd ask here:

Do Accumulators reside in a Task ? Are they being serialized with the task ? 
Sent back on task completion as part of the ResultTask ?

Are they reliable ? If so, when ? Can I relay on accumulators value only after 
the task was successfully complete (meaning in the driver) ? Or also during the 
task execution as well (what about speculative execution) ?

What are the limitations on the number (or size) of Accumulators ?

Thanks,
Amit


[Yarn-Client]Can not access SparkUI

2015-10-26 Thread Earthson
We are using Spark 1.5.1 with `--master yarn`, Yarn RM is running in HA mode.

direct visit




click ApplicationMaster link




YARN RM log






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Client-Can-not-access-SparkUI-tp25197.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Yarn-Client]Can not access SparkUI

2015-10-26 Thread syepes
Hello Earthson,

Is you cluster multihom​ed​?

If yes, try setting the variables SPARK_LOCAL_{IP,HOSTNAME} I had this issue
before: https://issues.apache.org/jira/browse/SPARK-11147



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Client-Can-not-access-SparkUI-tp25197p25199.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Large number of conf broadcasts

2015-10-26 Thread Anders Arpteg
Nice Koert, lets hope it gets merged soon.

/Anders

On Fri, Oct 23, 2015 at 6:32 PM Koert Kuipers  wrote:

> https://github.com/databricks/spark-avro/pull/95
>
> On Fri, Oct 23, 2015 at 5:01 AM, Koert Kuipers  wrote:
>
>> oh no wonder... it undoes the glob (i was reading from /some/path/*),
>> creates a hadoopRdd for every path, and then creates a union of them using
>> UnionRDD.
>>
>> thats not what i want... no need to do union. AvroInpuFormat already has
>> the ability to handle globs (or multiple paths comma separated) very
>> efficiently. AvroRelation should just pass the paths (comma separated).
>>
>>
>>
>>
>> On Thu, Oct 22, 2015 at 1:37 PM, Anders Arpteg 
>> wrote:
>>
>>> Yes, seems unnecessary. I actually tried patching the
>>> com.databricks.spark.avro reader to only broadcast once per dataset,
>>> instead of every single file/partition. It seems to work just as fine, and
>>> there are significantly less broadcasts and not seeing out of memory issues
>>> any more. Strange that more people does not react to this, since the
>>> broadcasting seems completely unnecessary...
>>>
>>> Best,
>>> Anders
>>>
>>>
>>> On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers  wrote:
>>>
 i am seeing the same thing. its gona completely crazy creating
 broadcasts for the last 15 mins or so. killing it...

 On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg 
 wrote:

> Hi,
>
> Running spark 1.5.0 in yarn-client mode, and am curios in why there
> are so many broadcast being done when loading datasets with large number 
> of
> partitions/files. Have datasets with thousands of partitions, i.e. hdfs
> files in the avro folder, and sometime loading hundreds of these large
> datasets. Believe I have located the broadcast to line
> SparkContext.scala:1006. It seems to just broadcast the hadoop
> configuration, and I don't see why it should be necessary to broadcast 
> that
> for EVERY file? Wouldn't it be possible to reuse the same broadcast
> configuration? It hardly the case the the configuration would be different
> between each file in a single dataset. Seems to be wasting lots of memory
> and needs to persist unnecessarily to disk (see below again).
>
> Thanks,
> Anders
>
> 15/09/24 17:11:11 INFO BlockManager: Writing block
> broadcast_1871_piece0 to disk
>  [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added
> broadcast_1871_piece0 on disk on 10.254.35.24:49428 (size: 23.1 KB)
> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored
> as bytes in memory (estimated size 23.1 KB, free 2.4 KB)
> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0
> in memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803
> from hadoopFile at AvroRelation.scala:121
> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
> threshold of 1024.0 KB for computing block broadcast_4804 in memory
> .
> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
> broadcast_4804 in memory! (computed 496.0 B so far)
> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) +
> 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
> limit = 530.3 MB.
> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
> disk instead.
> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
> curMem=556036460, maxMem=556038881
> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
> 15/09/24 17:11:11 INFO BlockManager: Dropping block
> broadcast_1872_piece0 from memory
> 15/09/24 17:11:11 INFO BlockManager: Writing block
> broadcast_1872_piece0 to disk
>
>


>>
>


Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Jinfeng Li
I cat /proc/net/dev and then take the difference of received bytes before
and after the job. I also see a long-time peak (nearly 600Mb/s) in nload
interface.  We have 18 machines and each machine receives 4.7G bytes.

On Mon, Oct 26, 2015 at 5:00 PM Sean Owen  wrote:

> -dev +user
> How are you measuring network traffic?
> It's not in general true that there will be zero network traffic, since
> not all executors are local to all data. That can be the situation in many
> cases but not always.
>
> On Mon, Oct 26, 2015 at 8:57 AM, Jinfeng Li  wrote:
>
>> Hi, I find that loading files from HDFS can incur huge amount of network
>> traffic. Input size is 90G and network traffic is about 80G. By my
>> understanding, local files should be read and thus no network communication
>> is needed.
>>
>> I use Spark 1.5.1, and the following is my code:
>>
>> val textRDD = sc.textFile("hdfs://master:9000/inputDir")
>> textRDD.count
>>
>> Jeffrey
>>
>
>


Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Sean Owen
Have a look at your HDFS replication, and where the blocks are for these
files. For example, if you had only 2 HDFS data nodes, then data would be
remote to 16 of 18 workers and always entail a copy.

On Mon, Oct 26, 2015 at 9:12 AM, Jinfeng Li  wrote:

> I cat /proc/net/dev and then take the difference of received bytes before
> and after the job. I also see a long-time peak (nearly 600Mb/s) in nload
> interface.  We have 18 machines and each machine receives 4.7G bytes.
>
> On Mon, Oct 26, 2015 at 5:00 PM Sean Owen  wrote:
>
>> -dev +user
>> How are you measuring network traffic?
>> It's not in general true that there will be zero network traffic, since
>> not all executors are local to all data. That can be the situation in many
>> cases but not always.
>>
>> On Mon, Oct 26, 2015 at 8:57 AM, Jinfeng Li  wrote:
>>
>>> Hi, I find that loading files from HDFS can incur huge amount of network
>>> traffic. Input size is 90G and network traffic is about 80G. By my
>>> understanding, local files should be read and thus no network communication
>>> is needed.
>>>
>>> I use Spark 1.5.1, and the following is my code:
>>>
>>> val textRDD = sc.textFile("hdfs://master:9000/inputDir")
>>> textRDD.count
>>>
>>> Jeffrey
>>>
>>
>>


Re: get host from rdd map

2015-10-26 Thread Deenar Toraskar
   1. You can call any api that returns you the hostname in your map
   function. Here's a simplified example, You would generally use
   mapPartitions as it will save the overhead of retrieving hostname multiple
   times
   2.
   3. import scala.sys.process._
   4. val distinctHosts = sc.parallelize(0 to 100).map { _ =>
   5. val hostname = ("hostname".!!).trim
   6. // your code
   7. (hostname)
   8. }.collect.distinct
   9.


On 24 October 2015 at 01:41, weoccc  wrote:

> yea,
>
> my use cases is that i want to have some external communications where rdd
> is being run in map. The external communication might be handled separately
> transparent to spark.  What will be the hacky way and nonhacky way to do
> that ? :)
>
> Weide
>
>
>
> On Fri, Oct 23, 2015 at 5:32 PM, Ted Yu  wrote:
>
>> Can you outline your use case a bit more ?
>>
>> Do you want to know all the hosts which would run the map ?
>>
>> Cheers
>>
>> On Fri, Oct 23, 2015 at 5:16 PM, weoccc  wrote:
>>
>>> in rdd map function, is there a way i can know the list of host names
>>> where the map runs ? any code sample would be appreciated ?
>>>
>>> thx,
>>>
>>> Weide
>>>
>>>
>>>
>>
>


Re: Spark scala REPL - Unable to create sqlContext

2015-10-26 Thread Deenar Toraskar
Embedded Derby, which Hive/Spark SQL uses as the default metastore only
supports a single user at a time. Till this issue is fixed, you could use
another metastore that supports multiple concurrent users (e.g. networked
derby or mysql) to get around it.

On 25 October 2015 at 16:15, Ge, Yao (Y.)  wrote:

> Thanks. I wonder why this is not widely reported in the user forum. The
> RELP shell is basically broken in 1.5 .0 and 1.5.1
>
> -Yao
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Sunday, October 25, 2015 12:01 PM
> *To:* Ge, Yao (Y.)
> *Cc:* user
> *Subject:* Re: Spark scala REPL - Unable to create sqlContext
>
>
>
> Have you taken a look at the fix for SPARK-11000 which is in the upcoming
> 1.6.0 release ?
>
>
>
> Cheers
>
>
>
> On Sun, Oct 25, 2015 at 8:42 AM, Yao  wrote:
>
> I have not been able to start Spark scala shell since 1.5 as it was not
> able
> to create the sqlContext during the startup. It complains the metastore_db
> is already locked: "Another instance of Derby may have already booted the
> database". The Derby log is attached.
>
> I only have this problem with starting the shell in yarn-client mode. I am
> working with HDP2.2.6 which runs Hadoop 2.6.
>
> -Yao derby.log
>  >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-scala-REPL-Unable-to-create-sqlContext-tp25195.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: [SPARK MLLIB] could not understand the wrong and inscrutable result of Linear Regression codes

2015-10-26 Thread Zhiliang Zhu
Hi Meihua,
I just found that setFitIntercept(false) is introduced since Spark 1.5.0, my 
current version is 1.4.0 . I shall also try that after update the version .
Since you said brezee is probably used, I knew brezee is used under the bottom 
of spark ml.Would you help comment some more how to use it here to solve 
systems of linear equations ...
Thank you very much~Zhiliang





 On Monday, October 26, 2015 2:58 PM, Meihua Wu 
 wrote:
   

 please add "setFitIntercept(false)" to your LinearRegression.

LinearRegression by default includes an intercept in the model, e.g.
label = intercept + features dot weight

To get the result you want, you need to force the intercept to be zero.

Just curious, are you trying to solve systems of linear equations? If
so, you can probably try breeze.



On Sun, Oct 25, 2015 at 9:10 PM, Zhiliang Zhu
 wrote:
>
>
>
> On Monday, October 26, 2015 11:26 AM, Zhiliang Zhu
>  wrote:
>
>
> Hi DB Tsai,
>
> Thanks very much for your kind help. I  get it now.
>
> I am sorry that there is another issue, the weight/coefficient result is
> perfect while A is triangular matrix, however, while A is not triangular
> matrix (but
> transformed from triangular matrix, still is invertible), the result seems
> not perfect and difficult to make it better by resetting the parameter.
> Would you help comment some about that...
>
> List localTraining = Lists.newArrayList(
>      new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)),
>      new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)),
>      new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)),
>      new LabeledPoint(-3.0, Vectors.dense(0.0, 0.0, -1.0, 0.0)));
> ...
> LinearRegression lr = new LinearRegression()
>      .setMaxIter(2)
>      .setRegParam(0)
>      .setElasticNetParam(0);
> 
>
> --
>
> It seems that no matter how to reset the parameters for lr , the output of
> x3 and x4 is always nearly the same .
> Whether there is some way to make the result a little better...
>
>
> --
>
> x3 and x4 could not become better, the output is:
> Final w:
> [0.999477672867,1.999748740578,3.500112393734,3.50011239377]
>
> Thank you,
> Zhiliang
>
>
>
> On Monday, October 26, 2015 10:25 AM, DB Tsai  wrote:
>
>
> Column 4 is always constant, so no predictive power resulting zero weight.
>
> On Sunday, October 25, 2015, Zhiliang Zhu  wrote:
>
> Hi DB Tsai,
>
> Thanks very much for your kind reply help.
>
> As for your comment, I just modified and tested the key part of the codes:
>
>  LinearRegression lr = new LinearRegression()
>        .setMaxIter(1)
>        .setRegParam(0)
>        .setElasticNetParam(0);  //the number could be reset
>
>  final LinearRegressionModel model = lr.fit(training);
>
> Now the output is much reasonable, however, x4 is always 0 while repeatedly
> reset those parameters in lr , would you help some about it how to properly
> set the parameters ...
>
> Final w: [1.00127825909,1.99979185054,2.3307136,0.0]
>
> Thank you,
> Zhiliang
>
>
>
>
> On Monday, October 26, 2015 5:14 AM, DB Tsai  wrote:
>
>
> LinearRegressionWithSGD is not stable. Please use linear regression in
> ML package instead.
> http://spark.apache.org/docs/latest/ml-linear-methods.html
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Sun, Oct 25, 2015 at 10:14 AM, Zhiliang Zhu
>  wrote:
>> Dear All,
>>
>> I have some program as below which makes me very much confused and
>> inscrutable, it is about multiple dimension linear regression mode, the
>> weight / coefficient is always perfect while the dimension is smaller than
>> 4, otherwise it is wrong all the time.
>> Or, whether the LinearRegressionWithSGD would be selected for another one?
>>
>> public class JavaLinearRegression {
>>  public static void main(String[] args) {
>>    SparkConf conf = new SparkConf().setAppName("Linear Regression
>> Example");
>>    JavaSparkContext sc = new JavaSparkContext(conf);
>>    SQLContext jsql = new SQLContext(sc);
>>
>>    //Ax = b, x = [1, 2, 3, 4] would be the only one output about weight
>>    //x1 + 2 * x2 + 3 * x3 + 4 * x4 = y would be the multiple linear mode
>>    List localTraining = Lists.newArrayList(
>>        new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)),
>>        new 

Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Jinfeng Li
The input data is a number of 16M files.

On Mon, Oct 26, 2015 at 5:12 PM Jinfeng Li  wrote:

> I cat /proc/net/dev and then take the difference of received bytes before
> and after the job. I also see a long-time peak (nearly 600Mb/s) in nload
> interface.  We have 18 machines and each machine receives 4.7G bytes.
>
> On Mon, Oct 26, 2015 at 5:00 PM Sean Owen  wrote:
>
>> -dev +user
>> How are you measuring network traffic?
>> It's not in general true that there will be zero network traffic, since
>> not all executors are local to all data. That can be the situation in many
>> cases but not always.
>>
>> On Mon, Oct 26, 2015 at 8:57 AM, Jinfeng Li  wrote:
>>
>>> Hi, I find that loading files from HDFS can incur huge amount of network
>>> traffic. Input size is 90G and network traffic is about 80G. By my
>>> understanding, local files should be read and thus no network communication
>>> is needed.
>>>
>>> I use Spark 1.5.1, and the following is my code:
>>>
>>> val textRDD = sc.textFile("hdfs://master:9000/inputDir")
>>> textRDD.count
>>>
>>> Jeffrey
>>>
>>
>>


Kryo makes String data invalid

2015-10-26 Thread Saif.A.Ellafi
Hi all,

I have a parquet file, which I am loading in a shell. When I launch the shell 
with -driver-java-options ="-Dspark.serializer=...kryo", makes a couple fields 
look like:

03-?? ??-?? ??-???
when calling > data.first

I will confirm briefly, but I am utterly sure it happens only on StringType 
fields.

Why could this be happening? perhaps when creating the parquet file from spark, 
Kryo wasn't set up?

If I disable Kryo, data looks good.

Any ideas?
Saif



RE: Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, Spark 1.5.1)

2015-10-26 Thread Silvio Fiorito
Hi Matthias,

Unless there was a change in 1.5, I'm afraid dynamic resource allocation is not 
yet supported in streaming apps.

Thanks,
Silvio

Sent from my Lumia 930

From: Matthias Niehoff
Sent: ‎10/‎26/‎2015 4:00 PM
To: user@spark.apache.org
Subject: Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, 
Spark 1.5.1)

Hello everybody,

I have a few (~15) Spark Streaming jobs which have load peaks as well as long 
times with a low load. So I thought the new Dynamic Resource Allocation for 
Standalone Clusters might be helpful (SPARK-4751).

I have a test "cluster" with 1 worker consisting of 4 executors with 2 cores 
each, so 8 cores in total.

I started a simple streaming application without limiting the max cores for 
this app. As expected the app occupied every core of the cluster. Then I 
started a second app, also without limiting the maximum cores. As the first app 
did not get any input through the stream, my naive expectation was that the 
second app would get at least 2 cores (1 receiver, 1 processing), but that's 
not what happened. The cores are still assigned to the first app.
When I look at the application UI of the first app every executor is still 
running. That explains why no executor is used for the second app.

I end up with two questions:
- When does an executor getting idle in a Spark Streaming application? (and so 
could be reassigned to another app)
- Is there another way to compete with uncertain load when using Spark 
Streaming Applications? I already combined multiple jobs to a Spark Application 
using different threads, but this approach comes to a limit for me, because 
Spark Applications get to big to manage.

Thank You!




Joining large data sets

2015-10-26 Thread Bryan
Hello.

What is the suggested practice for joining two large data streams? I am 
currently simply mapping out the key tuple on both streams then executing a 
join.

I have seen several suggestions for broadcast joins that seem to be targeted at 
a joining a larger data set to a small set (broadcasting the smaller set).

 For joining two large datasets, it would seem to be better to repartition both 
sets in the same way then join each partition. It there a suggested practice 
for this problem?

Thank you,

Bryan Jeffrey

Re: Broadcast table

2015-10-26 Thread Jags Ramnarayanan
If you are using Spark SQL and joining two dataFrames the optimizer would
automatically broadcast the smaller table (You can configure the size if
the default is too small).

Else, in code, you can collect any RDD to the driver and broadcast using
the context.broadcast method.
http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf

-- Jags
(www.snappydata.io)


On Mon, Oct 26, 2015 at 11:17 AM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Hi all,
>
>
>
> I use the thrift server, and I cache a table using “cache table mytab”.
>
> Is there any sql to broadcast it too?
>
>
>
> *Thanks*
>
> *Younes Naguib*
>
> Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
>
> Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | younes.naguib
> @tritondigital.com 
>
>
>


Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, Spark 1.5.1)

2015-10-26 Thread Matthias Niehoff
Hello everybody,

I have a few (~15) Spark Streaming jobs which have load peaks as well as
long times with a low load. So I thought the new Dynamic Resource
Allocation for Standalone Clusters might be helpful (SPARK-4751).

I have a test "cluster" with 1 worker consisting of 4 executors with 2
cores each, so 8 cores in total.

I started a simple streaming application without limiting the max cores for
this app. As expected the app occupied every core of the cluster. Then I
started a second app, also without limiting the maximum cores. As the first
app did not get any input through the stream, my naive expectation was that
the second app would get at least 2 cores (1 receiver, 1 processing), but
that's not what happened. The cores are still assigned to the first app.
When I look at the application UI of the first app every executor is still
running. That explains why no executor is used for the second app.

I end up with two questions:
- When does an executor getting idle in a Spark Streaming application? (and
so could be reassigned to another app)
- Is there another way to compete with uncertain load when using Spark
Streaming Applications? I already combined multiple jobs to a Spark
Application using different threads, but this approach comes to a limit for
me, because Spark Applications get to big to manage.

Thank You!


Custom function to operate on Dataframe Window

2015-10-26 Thread aaryabhatta
Hi,

Is there a way to create a custom function in Pyspark to operate on a
dataframe window. For example, similar to rank() function that outputs the
rank within that Window. 

If it can only be done in Scala / Java, may I know how?

Thanks.

Regards





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-function-to-operate-on-Dataframe-Window-tp25205.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Submitting Spark Applications - Do I need to leave ports open?

2015-10-26 Thread markluk
I want to submit interactive applications to a remote Spark cluster running
in standalone mode. 

I understand I need to connect to master's 7077 port. It also seems like the
master node need to open connections to my local machine. And the ports that
it needs to open are different every time. 

If I have firewall enabled on my local machine, spark-submit fails since the
ports it needs to open on my local machine are unreachable, so spark-submit
fails to connect to the master. 

I was able to get it to work if i disable firewall on my local machine. But
that's not a real solution. 

Is there some config that I'm not aware of that solves this problem?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-Do-I-need-to-leave-ports-open-tp25207.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Maven build failed (Spark master)

2015-10-26 Thread Kayode Odeyemi
I used this command which is synonymous to what you have:

./make-distribution.sh --name spark-latest --tgz --mvn mvn
-Dhadoop.version=2.6.0 -Phadoop-2.6 -Phive -Phive-thriftserver -DskipTests
clean package -U

But I still see WARNINGS like this in the output and no .gz file created:

cp: /usr/local/spark-latest/spark-[WARNING] See
http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-5.gz.parquet.crc:
No such file or directory
cp: /usr/local/spark-latest/spark-[WARNING] See
http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-5.gz.parquet:
No such file or directory
cp: /usr/local/spark-latest/spark-[WARNING] See
http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9:
No such file or directory
cp:
/usr/local/spark-latest/dist/python/test_support/sql/parquet_partitioned/year=2015/month=9:
unable to copy extended attributes to
/usr/local/spark-latest/spark-[WARNING] See
http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9:
No such file or directory
cp: /usr/local/spark-latest/spark-[WARNING] See
http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1:
No such file or directory
cp:
/usr/local/spark-latest/dist/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1:
unable to copy extended attributes to
/usr/local/spark-latest/spark-[WARNING] See
http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1:
No such file or directory
cp: /usr/local/spark-latest/spark-[WARNING] See
http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-7.gz.parquet.crc:
No such file or directory

On Mon, Oct 26, 2015 at 8:58 PM, Ted Yu  wrote:

> If you use the command shown in:
> https://github.com/apache/spark/pull/9281
>
> You should have got the following:
>
>
> ./dist/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-8.gz.parquet
>
> ./dist/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-7.gz.parquet
>
> ./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-4.gz.parquet
>
> ./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-2.gz.parquet
>
> ./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-5.gz.parquet
>
> On Mon, Oct 26, 2015 at 11:47 AM, Kayode Odeyemi 
> wrote:
>
>> I see a lot of stuffs like this after the a successful maven build:
>>
>> cp: /usr/local/spark-latest/spark-[WARNING] See
>> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/
>> part-r-8.gz.parquet: No such file or directory
>>
>> Seems it fails when it tries to package the build as an archive.
>>
>> I'm using the latest code on github master.
>>
>> Any ideas please?
>>
>> On Mon, Oct 26, 2015 at 6:20 PM, Yana Kadiyska 
>> wrote:
>>
>>> In 1.4 ./make_distribution produces a .tgz file in the root directory
>>> (same directory that make_distribution is in)
>>>
>>>
>>>
>>> On Mon, Oct 26, 2015 at 8:46 AM, Kayode Odeyemi 
>>> wrote:
>>>
 Hi,

 The ./make_distribution task completed. However, I can't seem to locate
 the
 .tar.gz file.

 Where does Spark save this? or should I just work with the dist
 directory?

 On Fri, Oct 23, 2015 at 4:23 PM, Kayode Odeyemi 
 wrote:

> I saw this when I tested manually (without ./make-distribution)
>
> Detected Maven Version: 3.2.2 is not in the allowed range 3.3.3.
>
> So I simply upgraded maven to 3.3.3.
>
> Resolved. Thanks
>
> On Fri, Oct 23, 2015 at 3:17 PM, Sean Owen  wrote:
>
>> This doesn't show the actual error output from Maven. I have a strong
>> guess that you haven't set MAVEN_OPTS to increase the memory Maven can
>> use.
>>
>> On Fri, Oct 23, 2015 at 6:14 AM, Kayode Odeyemi 
>> wrote:
>> > Hi,
>> >
>> > I can't seem to get a successful maven build. Please see command
>> output
>> > below:
>> >
>> > bash-3.2$ ./make-distribution.sh --name spark-latest --tgz --mvn mvn
>> > -Dhadoop.version=2.7.0 -Phadoop-2.7 -Phive -Phive-thriftserver
>> -DskipTests
>> > clean package
>> > +++ dirname ./make-distribution.sh
>> > ++ cd .
>> > ++ 

Re: Maven build failed (Spark master)

2015-10-26 Thread Ted Yu
Looks like '-Pyarn' was missing in your command.

On Mon, Oct 26, 2015 at 12:06 PM, Kayode Odeyemi  wrote:

> I used this command which is synonymous to what you have:
>
> ./make-distribution.sh --name spark-latest --tgz --mvn mvn
> -Dhadoop.version=2.6.0 -Phadoop-2.6 -Phive -Phive-thriftserver -DskipTests
> clean package -U
>
> But I still see WARNINGS like this in the output and no .gz file created:
>
> cp: /usr/local/spark-latest/spark-[WARNING] See
> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-5.gz.parquet.crc:
> No such file or directory
> cp: /usr/local/spark-latest/spark-[WARNING] See
> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-5.gz.parquet:
> No such file or directory
> cp: /usr/local/spark-latest/spark-[WARNING] See
> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9:
> No such file or directory
> cp:
> /usr/local/spark-latest/dist/python/test_support/sql/parquet_partitioned/year=2015/month=9:
> unable to copy extended attributes to
> /usr/local/spark-latest/spark-[WARNING] See
> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9:
> No such file or directory
> cp: /usr/local/spark-latest/spark-[WARNING] See
> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1:
> No such file or directory
> cp:
> /usr/local/spark-latest/dist/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1:
> unable to copy extended attributes to
> /usr/local/spark-latest/spark-[WARNING] See
> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1:
> No such file or directory
> cp: /usr/local/spark-latest/spark-[WARNING] See
> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-7.gz.parquet.crc:
> No such file or directory
>
> On Mon, Oct 26, 2015 at 8:58 PM, Ted Yu  wrote:
>
>> If you use the command shown in:
>> https://github.com/apache/spark/pull/9281
>>
>> You should have got the following:
>>
>>
>> ./dist/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-8.gz.parquet
>>
>> ./dist/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-7.gz.parquet
>>
>> ./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-4.gz.parquet
>>
>> ./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-2.gz.parquet
>>
>> ./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-5.gz.parquet
>>
>> On Mon, Oct 26, 2015 at 11:47 AM, Kayode Odeyemi 
>> wrote:
>>
>>> I see a lot of stuffs like this after the a successful maven build:
>>>
>>> cp: /usr/local/spark-latest/spark-[WARNING] See
>>> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/
>>> part-r-8.gz.parquet: No such file or directory
>>>
>>> Seems it fails when it tries to package the build as an archive.
>>>
>>> I'm using the latest code on github master.
>>>
>>> Any ideas please?
>>>
>>> On Mon, Oct 26, 2015 at 6:20 PM, Yana Kadiyska 
>>> wrote:
>>>
 In 1.4 ./make_distribution produces a .tgz file in the root directory
 (same directory that make_distribution is in)



 On Mon, Oct 26, 2015 at 8:46 AM, Kayode Odeyemi 
 wrote:

> Hi,
>
> The ./make_distribution task completed. However, I can't seem to
> locate the
> .tar.gz file.
>
> Where does Spark save this? or should I just work with the dist
> directory?
>
> On Fri, Oct 23, 2015 at 4:23 PM, Kayode Odeyemi 
> wrote:
>
>> I saw this when I tested manually (without ./make-distribution)
>>
>> Detected Maven Version: 3.2.2 is not in the allowed range 3.3.3.
>>
>> So I simply upgraded maven to 3.3.3.
>>
>> Resolved. Thanks
>>
>> On Fri, Oct 23, 2015 at 3:17 PM, Sean Owen 
>> wrote:
>>
>>> This doesn't show the actual error output from Maven. I have a strong
>>> guess that you haven't set MAVEN_OPTS to increase the memory Maven
>>> can
>>> use.
>>>
>>> On Fri, Oct 23, 2015 at 6:14 AM, Kayode Odeyemi 
>>> wrote:
>>> > Hi,
>>> >
>>> > I can't seem to get a successful maven build. Please see command
>>> output

Re: Spark with business rules

2015-10-26 Thread Jörn Franke

Maybe SparkR? What languages do your Users speak? 

> On 26 Oct 2015, at 23:12, danilo  wrote:
> 
> Hi All, I want to create a monitoring tool using my sensor data. I receive
> the events every seconds and I need to create a report using node.js. Right
> now I created my kpi coding the formula directly in sparks. 
> 
> However I would like to make a layer where a not technical user can write
> simple formulas to be interpretated in realt time by spark - the results are
> stored in a memory db.
> 
> Any suggestions?
> 
> thx
> Danilo 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-business-rules-tp25208.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, Spark 1.5.1)

2015-10-26 Thread Ted Yu
This is related:
SPARK-10955 Warn if dynamic allocation is enabled for Streaming jobs

which went into 1.6.0 as well.

FYI

On Mon, Oct 26, 2015 at 2:26 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Matthias,
>
> Unless there was a change in 1.5, I'm afraid dynamic resource allocation
> is not yet supported in streaming apps.
>
> Thanks,
> Silvio
>
> Sent from my Lumia 930
> --
> From: Matthias Niehoff 
> Sent: ‎10/‎26/‎2015 4:00 PM
> To: user@spark.apache.org
> Subject: Dynamic Resource Allocation with Spark Streaming (Standalone
> Cluster, Spark 1.5.1)
>
> Hello everybody,
>
> I have a few (~15) Spark Streaming jobs which have load peaks as well as
> long times with a low load. So I thought the new Dynamic Resource
> Allocation for Standalone Clusters might be helpful (SPARK-4751).
>
> I have a test "cluster" with 1 worker consisting of 4 executors with 2
> cores each, so 8 cores in total.
>
> I started a simple streaming application without limiting the max cores
> for this app. As expected the app occupied every core of the cluster. Then
> I started a second app, also without limiting the maximum cores. As the
> first app did not get any input through the stream, my naive expectation
> was that the second app would get at least 2 cores (1 receiver, 1
> processing), but that's not what happened. The cores are still assigned to
> the first app.
> When I look at the application UI of the first app every executor is still
> running. That explains why no executor is used for the second app.
>
> I end up with two questions:
> - When does an executor getting idle in a Spark Streaming application?
> (and so could be reassigned to another app)
> - Is there another way to compete with uncertain load when using Spark
> Streaming Applications? I already combined multiple jobs to a Spark
> Application using different threads, but this approach comes to a limit for
> me, because Spark Applications get to big to manage.
>
> Thank You!
>
>
>


Re: Spark with business rules

2015-10-26 Thread Holden Karau
Spark SQL seems like it might be the best interface if your users are
already familiar with SQL.

On Mon, Oct 26, 2015 at 3:12 PM, danilo  wrote:

> Hi All, I want to create a monitoring tool using my sensor data. I receive
> the events every seconds and I need to create a report using node.js. Right
> now I created my kpi coding the formula directly in sparks.
>
> However I would like to make a layer where a not technical user can write
> simple formulas to be interpretated in realt time by spark - the results
> are
> stored in a memory db.
>
> Any suggestions?
>
> thx
> Danilo
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-business-rules-tp25208.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Spark Implementation of XGBoost

2015-10-26 Thread DB Tsai
Interesting. For feature sub-sampling, is it per-node or per-tree? Do
you think you can implement generic GBM and have it merged as part of
Spark codebase?

Sincerely,

DB Tsai
--
Web: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Mon, Oct 26, 2015 at 11:42 AM, Meihua Wu
 wrote:
> Hi Spark User/Dev,
>
> Inspired by the success of XGBoost, I have created a Spark package for
> gradient boosting tree with 2nd order approximation of arbitrary
> user-defined loss functions.
>
> https://github.com/rotationsymmetry/SparkXGBoost
>
> Currently linear (normal) regression, binary classification, Poisson
> regression are supported. You can extend with other loss function as
> well.
>
> L1, L2, bagging, feature sub-sampling are also employed to avoid overfitting.
>
> Thank you for testing. I am looking forward to your comments and
> suggestions. Bugs or improvements can be reported through GitHub.
>
> Many thanks!
>
> Meihua
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Secondary Sorting in Spark

2015-10-26 Thread swetha kasireddy
Right now my code does the following for grouping by sessionId(which is the
key) and sorting by timestamp which is the first value in the tuple. The
second value in the tuple is Json.


def getGrpdAndSrtdSessions(rdd: RDD[(String, (Long, String))]):
RDD[(String, List[(Long, String)])] = {
  val grpdSessions = rdd.groupByKey();
  val srtdSessions  = grpdSessions.mapValues[(List[(Long,
String)])](iter => iter.toList.sortBy(_._1))
  srtdSessions
}


Based on the above blog post, should it be something like the following to
avoid a shuffle?

1. Create a class that has sessionId and timeStamp as the fields and use it
as the key.
2. The value will be my list of Json Strings which is the second field in
the tuple.
3.Create a Custom partitioner that chooses the partition based on session
id.
4.Write implicit ordering function of the key that does the ordering by
sessionId and timeStamp
5.And then do repartitionAndSortWithinPartitions


In this scenario, the code does the same thing as groupByKey and sortBy
correct?

 What about if I want to reduce the shuffling when I do a reduceByKey? Do I
just use a customPartitioner and then do reduceByKey? Does using Custom
Partitioner before using a reduceByKey improve performance?

On Mon, Oct 26, 2015 at 2:51 AM, Adrian Tanase  wrote:

> Do you have a particular concern? You’re always using a partitioner
> (default is HashPartitioner) and the Partitioner interface is pretty light,
> can’t see how it could affect performance.
>
> Used correctly it should improve performance as you can better control
> placement of data and avoid shuffling…
>
> -adrian
>
> From: swetha kasireddy
> Date: Monday, October 26, 2015 at 6:56 AM
> To: Adrian Tanase
> Cc: Bill Bejeck, "user@spark.apache.org"
> Subject: Re: Secondary Sorting in Spark
>
> Hi,
>
> Does the use of custom partitioner in Streaming affect performance?
>
> On Mon, Oct 5, 2015 at 1:06 PM, Adrian Tanase  wrote:
>
>> Great article, especially the use of a custom partitioner.
>>
>> Also, sorting by multiple fields by creating a tuple out of them is an
>> awesome, easy to miss, Scala feature.
>>
>> Sent from my iPhone
>>
>> On 04 Oct 2015, at 21:41, Bill Bejeck  wrote:
>>
>> I've written blog post on secondary sorting in Spark and I'd thought I'd
>> share it with the group
>>
>> http://codingjunkie.net/spark-secondary-sort/
>>
>> Thanks,
>> Bill
>>
>>
>


Re: Spark Implementation of XGBoost

2015-10-26 Thread DB Tsai
Also, does it support categorical feature?

Sincerely,

DB Tsai
--
Web: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Mon, Oct 26, 2015 at 4:06 PM, DB Tsai  wrote:
> Interesting. For feature sub-sampling, is it per-node or per-tree? Do
> you think you can implement generic GBM and have it merged as part of
> Spark codebase?
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Mon, Oct 26, 2015 at 11:42 AM, Meihua Wu
>  wrote:
>> Hi Spark User/Dev,
>>
>> Inspired by the success of XGBoost, I have created a Spark package for
>> gradient boosting tree with 2nd order approximation of arbitrary
>> user-defined loss functions.
>>
>> https://github.com/rotationsymmetry/SparkXGBoost
>>
>> Currently linear (normal) regression, binary classification, Poisson
>> regression are supported. You can extend with other loss function as
>> well.
>>
>> L1, L2, bagging, feature sub-sampling are also employed to avoid overfitting.
>>
>> Thank you for testing. I am looking forward to your comments and
>> suggestions. Bugs or improvements can be reported through GitHub.
>>
>> Many thanks!
>>
>> Meihua
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Implementation of XGBoost

2015-10-26 Thread YiZhi Liu
There's an xgboost exploration jira SPARK-8547. Can it be a good start?

2015-10-27 7:07 GMT+08:00 DB Tsai :
> Also, does it support categorical feature?
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Mon, Oct 26, 2015 at 4:06 PM, DB Tsai  wrote:
>> Interesting. For feature sub-sampling, is it per-node or per-tree? Do
>> you think you can implement generic GBM and have it merged as part of
>> Spark codebase?
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Web: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>>
>>
>> On Mon, Oct 26, 2015 at 11:42 AM, Meihua Wu
>>  wrote:
>>> Hi Spark User/Dev,
>>>
>>> Inspired by the success of XGBoost, I have created a Spark package for
>>> gradient boosting tree with 2nd order approximation of arbitrary
>>> user-defined loss functions.
>>>
>>> https://github.com/rotationsymmetry/SparkXGBoost
>>>
>>> Currently linear (normal) regression, binary classification, Poisson
>>> regression are supported. You can extend with other loss function as
>>> well.
>>>
>>> L1, L2, bagging, feature sub-sampling are also employed to avoid 
>>> overfitting.
>>>
>>> Thank you for testing. I am looking forward to your comments and
>>> suggestions. Bugs or improvements can be reported through GitHub.
>>>
>>> Many thanks!
>>>
>>> Meihua
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Yizhi Liu
Senior Software Engineer / Data Mining
www.mvad.com, Shanghai, China

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark with business rules

2015-10-26 Thread danilo
Hi All, I want to create a monitoring tool using my sensor data. I receive
the events every seconds and I need to create a report using node.js. Right
now I created my kpi coding the formula directly in sparks. 

However I would like to make a layer where a not technical user can write
simple formulas to be interpretated in realt time by spark - the results are
stored in a memory db.

Any suggestions?

thx
Danilo 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-business-rules-tp25208.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Results change in group by operation

2015-10-26 Thread Saif.A.Ellafi
Hello Everyone,

I would need urgent help with a data consistency issue I am having.
Stand alone Cluster of five servers. sqlContext instance of HiveContext 
(default in spark-shell)
No special options other than driver memory and executor memory.
Parquet partitions are 512 where there are 160 cores.
Data is nearly 2 billion rows.
The issue happens

val data = sqlContext.read.parquet("/var/Saif/data_pqt")

val res = data.groupBy("product", "band", "age", "vint", "mb", 
"mm").agg(count($"account_id").as("N"), sum($"balance").as("balance_eom"), 
sum($"balancec").as("balance_eoc"), sum($"spend").as("spend"), 
sum($"payment").as("payment"), sum($"feoc").as("feoc"), 
sum($"cfintbal").as("cfintbal"), count($"newacct" === 
1).as("newacct")).persist()

val z = res.select("vint", "mm").filter("vint = 
'2007-01-01'").select("mm").distinct.collect

z.length

>>> res0: Int = 102

res.unpersist()

val z = res.select("vint", "mm").filter("vint = 
'2007-01-01'").select("mm").distinct.collect

z.length

>>> res1: Int = 103

Please help,
Saif







RE: HiveContext ignores ("skip.header.line.count"="1")

2015-10-26 Thread Cheng, Hao
I am not sure if we really want to support that with HiveContext, but a 
workround is to use the Spark package at https://github.com/databricks/spark-csv


From: Felix Cheung [mailto:felixcheun...@hotmail.com]
Sent: Tuesday, October 27, 2015 10:54 AM
To: Daniel Haviv; user
Subject: RE: HiveContext ignores ("skip.header.line.count"="1")

Please open a JIRA?



Date: Mon, 26 Oct 2015 15:32:42 +0200
Subject: HiveContext ignores ("skip.header.line.count"="1")
From: daniel.ha...@veracity-group.com
To: user@spark.apache.org
Hi,
I have a csv table in Hive which is configured to skip the header row using 
TBLPROPERTIES("skip.header.line.count"="1").
When querying from Hive the header row is not included in the data, but when 
running the same query via HiveContext I get the header row.

I made sure that HiveContext sees the skip.header.line.count setting by running 
"show create table"

Any ideas?

Thank you.
Daniel


RE: Concurrent execution of actions within a driver

2015-10-26 Thread Silvio Fiorito
There is a collectAsync action if you want to run them in parallel, but keep in 
mind the two jobs will need to share resources and you should use the FAIR 
scheduler.

From: praveen S
Sent: ‎10/‎26/‎2015 4:27 AM
To: user@spark.apache.org
Subject: Concurrent execution of actions within a driver


Does spark run different actions of an rdd within a driver in parallel also?

Let's say
class Driver{

val rdd1= sc. textFile("... ")
val rdd2=sc.textFile("")
rdd1. collect //Action 1
rdd2. collect //Action 2

}

Does Spark run Action 1 & 2 run in parallel? ( some kind of a pass through the 
driver code and than start the execution)?

if not than is using threads safe for independent actions/red's?


Re: Spark Implementation of XGBoost

2015-10-26 Thread Meihua Wu
Hi YiZhi,

Thank you for mentioning the jira. I will add a note to the jira.

Meihua

On Mon, Oct 26, 2015 at 6:16 PM, YiZhi Liu  wrote:
> There's an xgboost exploration jira SPARK-8547. Can it be a good start?
>
> 2015-10-27 7:07 GMT+08:00 DB Tsai :
>> Also, does it support categorical feature?
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Web: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>>
>>
>> On Mon, Oct 26, 2015 at 4:06 PM, DB Tsai  wrote:
>>> Interesting. For feature sub-sampling, is it per-node or per-tree? Do
>>> you think you can implement generic GBM and have it merged as part of
>>> Spark codebase?
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> --
>>> Web: https://www.dbtsai.com
>>> PGP Key ID: 0xAF08DF8D
>>>
>>>
>>> On Mon, Oct 26, 2015 at 11:42 AM, Meihua Wu
>>>  wrote:
 Hi Spark User/Dev,

 Inspired by the success of XGBoost, I have created a Spark package for
 gradient boosting tree with 2nd order approximation of arbitrary
 user-defined loss functions.

 https://github.com/rotationsymmetry/SparkXGBoost

 Currently linear (normal) regression, binary classification, Poisson
 regression are supported. You can extend with other loss function as
 well.

 L1, L2, bagging, feature sub-sampling are also employed to avoid 
 overfitting.

 Thank you for testing. I am looking forward to your comments and
 suggestions. Bugs or improvements can be reported through GitHub.

 Many thanks!

 Meihua

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> Yizhi Liu
> Senior Software Engineer / Data Mining
> www.mvad.com, Shanghai, China

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HiveContext ignores ("skip.header.line.count"="1")

2015-10-26 Thread Daniel Haviv
I will

Thank you.

> On 27 באוק׳ 2015, at 4:54, Felix Cheung  wrote:
> 
> Please open a JIRA?
> 
>  
> Date: Mon, 26 Oct 2015 15:32:42 +0200
> Subject: HiveContext ignores ("skip.header.line.count"="1")
> From: daniel.ha...@veracity-group.com
> To: user@spark.apache.org
> 
> Hi,
> I have a csv table in Hive which is configured to skip the header row using 
> TBLPROPERTIES("skip.header.line.count"="1").
> When querying from Hive the header row is not included in the data, but when 
> running the same query via HiveContext I get the header row.
> 
> I made sure that HiveContext sees the skip.header.line.count setting by 
> running "show create table"
> 
> Any ideas?
> 
> Thank you.
> Daniel


Re: Spark Implementation of XGBoost

2015-10-26 Thread Meihua Wu
Hi DB Tsai,

Thank you very much for your interest and comment.

1) feature sub-sample is per-node, like random forest.

2) The current code heavily exploits the tree structure to speed up
the learning (such as processing multiple learning node in one pass of
the training data). So a generic GBM is likely to be a different
codebase. Do you have any nice reference of efficient GBM? I am more
than happy to look into that.

3) The algorithm accept training data as a DataFrame with the
featureCol indexed by VectorIndexer. You can specify which variable is
categorical in the VectorIndexer. Please note that currently all
categorical variables are treated as ordered. If you want some
categorical variables as unordered, you can pass the data through
OneHotEncoder before the VectorIndexer. I do have a plan to handle
unordered categorical variable using the approach in RF in Spark ML
(Please see roadmap in the README.md)

Thanks,

Meihua



On Mon, Oct 26, 2015 at 4:06 PM, DB Tsai  wrote:
> Interesting. For feature sub-sampling, is it per-node or per-tree? Do
> you think you can implement generic GBM and have it merged as part of
> Spark codebase?
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Mon, Oct 26, 2015 at 11:42 AM, Meihua Wu
>  wrote:
>> Hi Spark User/Dev,
>>
>> Inspired by the success of XGBoost, I have created a Spark package for
>> gradient boosting tree with 2nd order approximation of arbitrary
>> user-defined loss functions.
>>
>> https://github.com/rotationsymmetry/SparkXGBoost
>>
>> Currently linear (normal) regression, binary classification, Poisson
>> regression are supported. You can extend with other loss function as
>> well.
>>
>> L1, L2, bagging, feature sub-sampling are also employed to avoid overfitting.
>>
>> Thank you for testing. I am looking forward to your comments and
>> suggestions. Bugs or improvements can be reported through GitHub.
>>
>> Many thanks!
>>
>> Meihua
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Running in cluster mode causes native library linking to fail

2015-10-26 Thread prajod.vettiyattil
Hi Bernardo,

Glad that our suggestions helped. A bigger thanks for sharing your solution 
with us. That was a tricky and difficult problem to track and solve !

Regards,
Prajod

From: Bernardo Vecchia Stein [mailto:bernardovst...@gmail.com]
Sent: 26 October 2015 23:41
To: Prajod S Vettiyattil (WT01 - BAS) 
Cc: user 
Subject: Re: Running in cluster mode causes native library linking to fail

Hello guys,
After lots of time trying to make things work, I finally found what was causing 
the issue:
I was calling the function from the library inside a map function, which caused 
the code inside it to be run in executors instead of the driver. Since only the 
driver had loaded the library, the executors would then give an error. The 
tricky part is that the same error message was being replicated in the driver's 
and the executor's logs, so it led me to believe it was a global error. Only 
after testing running stuff *only* on the driver was that I discovered that 
everything worked.
For future reference: if you are running into this issue, please check if you 
are also loading the library on the executors! In the case of my map example, 
the fix was to create a wrapper function that 1) loaded libraries and then 2) 
called functions within the library. After that, map things to this wrapper 
function. This way, you ensure every executor also loads the libraries.
I'd like to thank Prajod, Renato and Deenar for the help.
Bernardo

On 15 October 2015 at 03:27, 
> wrote:
Forwarding to the group, in case someone else has the same error. Just found 
out that I did not reply to the group in my original reply.

From: Prajod S Vettiyattil (WT01 - BAS)
Sent: 15 October 2015 11:45
To: 'Bernardo Vecchia Stein' 
>
Subject: RE: Running in cluster mode causes native library linking to fail

Hi,

Also try the path settings given here: 
http://stackoverflow.com/questions/12279833/videocapture-opencv-2-4-2-error-in-windows/29278322#29278322

Forgot to add this link in my response earlier:
https://blogs.oracle.com/darcy/entry/purging_ld_library_path
http://www.oracle.com/technetwork/java/javase/jdk7-relnotes-418459.html

So from java 7, LD_LIBRARY_PATH is ignored. This is for Linux and Solaris. And 
probably for all other Unix derivatives.

Also check : System.loadLibrary() should be inside a static {  } block. Please 
check for its syntax on the internet. The loadlibrary function has to be called 
during class load time. That is why the static block is required.

What is your ?

1.  Spark version

2.  OS type and version

3.  Library that you are trying to load.



[I was using OpenCV. Had to go through many trials to get it working 
consistently. Initially, it would work only on dev environment(windows) but not 
on Ubuntu. Its been a few months. There is a stackoverflow answer I have given 
regarding this: 
http://stackoverflow.com/questions/12279833/videocapture-opencv-2-4-2-error-in-windows/29278322#29278322
 ]

Regards,
Prajod

From: Bernardo Vecchia Stein 
[mailto:bernardovst...@gmail.com]
Sent: 15 October 2015 00:36
To: Prajod S Vettiyattil (WT01 - BAS) 
>
Subject: Re: Running in cluster mode causes native library linking to fail

Hello Prajod,
Thanks for your reply! I am also using the standalone cluster manager. I do not 
build the jars in Eclipse and neither use Maven. They are built with sbt by 
hand.
I was setting LD_LIBRARY_PATH and LIBRARY_PATH to point to the paths with the 
library. When I didn't set them and set only PATH instead, spark would just not 
find the libraries (it was another error). I'm not sure what version you are 
using, but it appears I do have to set LD_LIBRARY_PATH in order to make things 
work.
I tried a simpler approach using System.load() with a specific path to the 
library, so I don't have to deal with these paths. However, I still get the 
same error when executing in cluster mode (exactly the same error). Do you have 
any idea why that might be failing?
Thank you again for your attention,
Bernardo

On 14 October 2015 at 03:30, 
> wrote:
Hi,

I have successfully made this working using the “standalone”cluster manager. 
Not tried with Mesos or YARN.

Which of these cluster managers are you using ? 
https://spark.apache.org/docs/1.1.0/cluster-overview.html
•Standalone 
– a simple cluster manager included with Spark that makes it easy to set up a 
cluster.
•Apache 
Mesos – a general 
cluster manager that can also run Hadoop MapReduce and service applications.
•Hadoop 

RE: HiveContext ignores ("skip.header.line.count"="1")

2015-10-26 Thread Felix Cheung
Please open a JIRA?

 
Date: Mon, 26 Oct 2015 15:32:42 +0200
Subject: HiveContext ignores ("skip.header.line.count"="1")
From: daniel.ha...@veracity-group.com
To: user@spark.apache.org

Hi,I have a csv table in Hive which is configured to skip the header row using 
TBLPROPERTIES("skip.header.line.count"="1").When querying from Hive the header 
row is not included in the data, but when running the same query via 
HiveContext I get the header row.

I made sure that HiveContext sees the skip.header.line.count setting by running 
"show create table"
Any ideas?
Thank you.
Daniel

Spark Implementation of XGBoost

2015-10-26 Thread Meihua Wu
Hi Spark User/Dev,

Inspired by the success of XGBoost, I have created a Spark package for
gradient boosting tree with 2nd order approximation of arbitrary
user-defined loss functions.

https://github.com/rotationsymmetry/SparkXGBoost

Currently linear (normal) regression, binary classification, Poisson
regression are supported. You can extend with other loss function as
well.

L1, L2, bagging, feature sub-sampling are also employed to avoid overfitting.

Thank you for testing. I am looking forward to your comments and
suggestions. Bugs or improvements can be reported through GitHub.

Many thanks!

Meihua

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Maven build failed (Spark master)

2015-10-26 Thread Kayode Odeyemi
I see a lot of stuffs like this after the a successful maven build:

cp: /usr/local/spark-latest/spark-[WARNING] See
http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/
part-r-8.gz.parquet: No such file or directory

Seems it fails when it tries to package the build as an archive.

I'm using the latest code on github master.

Any ideas please?

On Mon, Oct 26, 2015 at 6:20 PM, Yana Kadiyska 
wrote:

> In 1.4 ./make_distribution produces a .tgz file in the root directory
> (same directory that make_distribution is in)
>
>
>
> On Mon, Oct 26, 2015 at 8:46 AM, Kayode Odeyemi  wrote:
>
>> Hi,
>>
>> The ./make_distribution task completed. However, I can't seem to locate
>> the
>> .tar.gz file.
>>
>> Where does Spark save this? or should I just work with the dist directory?
>>
>> On Fri, Oct 23, 2015 at 4:23 PM, Kayode Odeyemi 
>> wrote:
>>
>>> I saw this when I tested manually (without ./make-distribution)
>>>
>>> Detected Maven Version: 3.2.2 is not in the allowed range 3.3.3.
>>>
>>> So I simply upgraded maven to 3.3.3.
>>>
>>> Resolved. Thanks
>>>
>>> On Fri, Oct 23, 2015 at 3:17 PM, Sean Owen  wrote:
>>>
 This doesn't show the actual error output from Maven. I have a strong
 guess that you haven't set MAVEN_OPTS to increase the memory Maven can
 use.

 On Fri, Oct 23, 2015 at 6:14 AM, Kayode Odeyemi 
 wrote:
 > Hi,
 >
 > I can't seem to get a successful maven build. Please see command
 output
 > below:
 >
 > bash-3.2$ ./make-distribution.sh --name spark-latest --tgz --mvn mvn
 > -Dhadoop.version=2.7.0 -Phadoop-2.7 -Phive -Phive-thriftserver
 -DskipTests
 > clean package
 > +++ dirname ./make-distribution.sh
 > ++ cd .
 > ++ pwd
 > + SPARK_HOME=/usr/local/spark-latest
 > + DISTDIR=/usr/local/spark-latest/dist
 > + SPARK_TACHYON=false
 > + TACHYON_VERSION=0.7.1
 > + TACHYON_TGZ=tachyon-0.7.1-bin.tar.gz
 > +
 > TACHYON_URL=
 https://github.com/amplab/tachyon/releases/download/v0.7.1/tachyon-0.7.1-bin.tar.gz
 > + MAKE_TGZ=false
 > + NAME=none
 > + MVN=/usr/local/spark-latest/build/mvn
 > + ((  12  ))
 > + case $1 in
 > + NAME=spark-latest
 > + shift
 > + shift
 > + ((  10  ))
 > + case $1 in
 > + MAKE_TGZ=true
 > + shift
 > + ((  9  ))
 > + case $1 in
 > + MVN=mvn
 > + shift
 > + shift
 > + ((  7  ))
 > + case $1 in
 > + break
 > + '[' -z
 /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
 > + '[' -z
 /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
 > ++ command -v git
 > + '[' /usr/bin/git ']'
 > ++ git rev-parse --short HEAD
 > + GITREV=487d409
 > + '[' '!' -z 487d409 ']'
 > + GITREVSTRING=' (git revision 487d409)'
 > + unset GITREV
 > ++ command -v mvn
 > + '[' '!' /usr/bin/mvn ']'
 > ++ mvn help:evaluate -Dexpression=project.version
 -Dhadoop.version=2.7.0
 > -Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests clean package
 > ++ grep -v INFO
 > ++ tail -n 1
 > + VERSION='[ERROR] [Help 1]
 >
 http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
 '
 >
 > Same output error with JDK 7
 >
 > Appreciate your help.
 >
 >

>>>
>>>
>>>
>>
>


Re: Maven build failed (Spark master)

2015-10-26 Thread Ted Yu
If you use the command shown in:
https://github.com/apache/spark/pull/9281

You should have got the following:

./dist/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-8.gz.parquet
./dist/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-7.gz.parquet
./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-4.gz.parquet
./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-2.gz.parquet
./dist/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-5.gz.parquet

On Mon, Oct 26, 2015 at 11:47 AM, Kayode Odeyemi  wrote:

> I see a lot of stuffs like this after the a successful maven build:
>
> cp: /usr/local/spark-latest/spark-[WARNING] See
> http://docs.codehaus.org/display/MAVENUSER/Shade+Plugin-bin-spark-latest/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/
> part-r-8.gz.parquet: No such file or directory
>
> Seems it fails when it tries to package the build as an archive.
>
> I'm using the latest code on github master.
>
> Any ideas please?
>
> On Mon, Oct 26, 2015 at 6:20 PM, Yana Kadiyska 
> wrote:
>
>> In 1.4 ./make_distribution produces a .tgz file in the root directory
>> (same directory that make_distribution is in)
>>
>>
>>
>> On Mon, Oct 26, 2015 at 8:46 AM, Kayode Odeyemi 
>> wrote:
>>
>>> Hi,
>>>
>>> The ./make_distribution task completed. However, I can't seem to locate
>>> the
>>> .tar.gz file.
>>>
>>> Where does Spark save this? or should I just work with the dist
>>> directory?
>>>
>>> On Fri, Oct 23, 2015 at 4:23 PM, Kayode Odeyemi 
>>> wrote:
>>>
 I saw this when I tested manually (without ./make-distribution)

 Detected Maven Version: 3.2.2 is not in the allowed range 3.3.3.

 So I simply upgraded maven to 3.3.3.

 Resolved. Thanks

 On Fri, Oct 23, 2015 at 3:17 PM, Sean Owen  wrote:

> This doesn't show the actual error output from Maven. I have a strong
> guess that you haven't set MAVEN_OPTS to increase the memory Maven can
> use.
>
> On Fri, Oct 23, 2015 at 6:14 AM, Kayode Odeyemi 
> wrote:
> > Hi,
> >
> > I can't seem to get a successful maven build. Please see command
> output
> > below:
> >
> > bash-3.2$ ./make-distribution.sh --name spark-latest --tgz --mvn mvn
> > -Dhadoop.version=2.7.0 -Phadoop-2.7 -Phive -Phive-thriftserver
> -DskipTests
> > clean package
> > +++ dirname ./make-distribution.sh
> > ++ cd .
> > ++ pwd
> > + SPARK_HOME=/usr/local/spark-latest
> > + DISTDIR=/usr/local/spark-latest/dist
> > + SPARK_TACHYON=false
> > + TACHYON_VERSION=0.7.1
> > + TACHYON_TGZ=tachyon-0.7.1-bin.tar.gz
> > +
> > TACHYON_URL=
> https://github.com/amplab/tachyon/releases/download/v0.7.1/tachyon-0.7.1-bin.tar.gz
> > + MAKE_TGZ=false
> > + NAME=none
> > + MVN=/usr/local/spark-latest/build/mvn
> > + ((  12  ))
> > + case $1 in
> > + NAME=spark-latest
> > + shift
> > + shift
> > + ((  10  ))
> > + case $1 in
> > + MAKE_TGZ=true
> > + shift
> > + ((  9  ))
> > + case $1 in
> > + MVN=mvn
> > + shift
> > + shift
> > + ((  7  ))
> > + case $1 in
> > + break
> > + '[' -z
> /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
> > + '[' -z
> /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
> > ++ command -v git
> > + '[' /usr/bin/git ']'
> > ++ git rev-parse --short HEAD
> > + GITREV=487d409
> > + '[' '!' -z 487d409 ']'
> > + GITREVSTRING=' (git revision 487d409)'
> > + unset GITREV
> > ++ command -v mvn
> > + '[' '!' /usr/bin/mvn ']'
> > ++ mvn help:evaluate -Dexpression=project.version
> -Dhadoop.version=2.7.0
> > -Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests clean package
> > ++ grep -v INFO
> > ++ tail -n 1
> > + VERSION='[ERROR] [Help 1]
> >
> http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
> '
> >
> > Same output error with JDK 7
> >
> > Appreciate your help.
> >
> >
>



>>>
>>
>
>


Re: Error Compiling Spark 1.4.1 w/ Scala 2.11 & Hive Support

2015-10-26 Thread Bryan Jeffrey
All,

The error resolved to a bad version of jline pulling from Maven.  The jline
version is defined as 'scala.version' -- the 2.11 version does not exist in
maven.  Instead the following should be used:

 
org.scala-lang
jline
2.11.0-M3
  

Regards,

Bryan Jeffrey

On Mon, Oct 26, 2015 at 9:01 AM, Bryan Jeffrey 
wrote:

> All,
>
> I'm seeing the following error compiling Spark 1.4.1 w/ Scala 2.11 & Hive
> support. Any ideas?
>
> mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
> -Phive-thriftserver package
>
> [INFO] Spark Project Parent POM .. SUCCESS [4.124s]
> [INFO] Spark Launcher Project  SUCCESS [9.001s]
> [INFO] Spark Project Networking .. SUCCESS [7.871s]
> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [3.904s]
> [INFO] Spark Project Unsafe .. SUCCESS [3.095s]
> [INFO] Spark Project Core  SUCCESS
> [24.768s]
> [INFO] Spark Project Bagel ... SUCCESS [2.029s]
> [INFO] Spark Project GraphX .. SUCCESS [4.057s]
> [INFO] Spark Project Streaming ... SUCCESS [9.774s]
> [INFO] Spark Project Catalyst  SUCCESS [6.804s]
> [INFO] Spark Project SQL . SUCCESS [9.606s]
> [INFO] Spark Project ML Library .. SUCCESS
> [10.872s]
> [INFO] Spark Project Tools ... SUCCESS [0.627s]
> [INFO] Spark Project Hive  SUCCESS
> [13.463s]
> [INFO] Spark Project REPL  SUCCESS [1.414s]
> [INFO] Spark Project YARN  SUCCESS [2.433s]
> [INFO] Spark Project Hive Thrift Server .. FAILURE [8.097s]
>
>
> [ERROR]
> /spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:25:
> object ConsoleReader is not a member of package jline
> [ERROR] import jline.{ConsoleReader, History}
> [ERROR]^
> [WARNING] Class jline.Completor not found - continuing with a stub.
> [WARNING] Class jline.ConsoleReader not found - continuing with a stub.
> [ERROR]
> /spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:171:
> not found: type ConsoleReader
> [ERROR] val reader = new ConsoleReader()
> [ERROR]  ^
> [ERROR] Class jline.Completor not found - continuing with a stub.
>


RE: Loading binary files from NFS share

2015-10-26 Thread Andrianasolo Fanilo
Hi again,

I found this : https://github.com/NetApp/NetApp-Hadoop-NFS-Connector

Maybe it will enable you to read NFS data from Spark at least. Anyone from the 
community used it ?

BR,
Fanilo

De : Andrianasolo Fanilo
Envoyé : lundi 26 octobre 2015 15:24
À : 'Kayode Odeyemi'; user
Objet : RE: Loading binary files from NFS share

Hi,

I believe binaryFiles uses a custom Hadoop Input Format, so it can only read 
specific Hadoop protocols.

You can find the full list of supported protocols by typing “Hadoop filesystems 
hdfs hftp” in Google (the link I found is a little bit long and references the 
Hadoop Definitive Guide, but here it is shortened ☺ : http://bit.ly/1PNWEeN, I 
couldn’t find a more direct link to the Hadoop website or Wiki though).

I will assume that nfs is not possible given this list.

Best Regards,

Fanilo


De : Kayode Odeyemi [mailto:drey...@gmail.com]
Envoyé : lundi 26 octobre 2015 14:59
À : user
Objet : Loading binary files from NFS share

Hi,

Is it possible to load binary files from NFS share like this:

sc.binaryFiles("nfs://host/mountpath")

I understand that it takes a path, but want to know if it allows protocol.

Appreciate your help.



Ce message et les pièces jointes sont confidentiels et réservés à l'usage 
exclusif de ses destinataires. Il peut également être protégé par le secret 
professionnel. Si vous recevez ce message par erreur, merci d'en avertir 
immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant 
être assurée sur Internet, la responsabilité de Worldline ne pourra être 
recherchée quant au contenu de ce message. Bien que les meilleurs efforts 
soient faits pour maintenir cette transmission exempte de tout virus, 
l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne 
saurait être recherchée pour tout dommage résultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, the Worldline liability cannot be triggered for the 
message content. Although the sender endeavours to maintain a computer 
virus-free network, the sender does not warrant that this transmission is 
virus-free and will not be liable for any damages resulting from any virus 
transmitted.


Loading binary files from NFS share

2015-10-26 Thread Kayode Odeyemi
Hi,

Is it possible to load binary files from NFS share like this:

sc.binaryFiles("nfs://host/mountpath")

I understand that it takes a path, but want to know if it allows protocol.

Appreciate your help.


RE: Loading binary files from NFS share

2015-10-26 Thread Andrianasolo Fanilo
Hi,

I believe binaryFiles uses a custom Hadoop Input Format, so it can only read 
specific Hadoop protocols.

You can find the full list of supported protocols by typing “Hadoop filesystems 
hdfs hftp” in Google (the link I found is a little bit long and references the 
Hadoop Definitive Guide, but here it is shortened ☺ : http://bit.ly/1PNWEeN, I 
couldn’t find a more direct link to the Hadoop website or Wiki though).

I will assume that nfs is not possible given this list.

Best Regards,

Fanilo


De : Kayode Odeyemi [mailto:drey...@gmail.com]
Envoyé : lundi 26 octobre 2015 14:59
À : user
Objet : Loading binary files from NFS share

Hi,

Is it possible to load binary files from NFS share like this:

sc.binaryFiles("nfs://host/mountpath")

I understand that it takes a path, but want to know if it allows protocol.

Appreciate your help.



Ce message et les pièces jointes sont confidentiels et réservés à l'usage 
exclusif de ses destinataires. Il peut également être protégé par le secret 
professionnel. Si vous recevez ce message par erreur, merci d'en avertir 
immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant 
être assurée sur Internet, la responsabilité de Worldline ne pourra être 
recherchée quant au contenu de ce message. Bien que les meilleurs efforts 
soient faits pour maintenir cette transmission exempte de tout virus, 
l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne 
saurait être recherchée pour tout dommage résultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, the Worldline liability cannot be triggered for the 
message content. Although the sender endeavours to maintain a computer 
virus-free network, the sender does not warrant that this transmission is 
virus-free and will not be liable for any damages resulting from any virus 
transmitted.


Re: Error Compiling Spark 1.4.1 w/ Scala 2.11 & Hive Support

2015-10-26 Thread Sean Owen
Did you switch the build to Scala 2.11 by running the script in dev/? It
won't work otherwise, but does work if you do. @Ted 2.11 was supported in
1.4, not just 1.5.

On Mon, Oct 26, 2015 at 2:13 PM, Bryan Jeffrey 
wrote:

> All,
>
> The error resolved to a bad version of jline pulling from Maven.  The
> jline version is defined as 'scala.version' -- the 2.11 version does not
> exist in maven.  Instead the following should be used:
>
>  
> org.scala-lang
> jline
> 2.11.0-M3
>   
>
> Regards,
>
> Bryan Jeffrey
>
> On Mon, Oct 26, 2015 at 9:01 AM, Bryan Jeffrey 
> wrote:
>
>> All,
>>
>> I'm seeing the following error compiling Spark 1.4.1 w/ Scala 2.11 & Hive
>> support. Any ideas?
>>
>> mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
>> -Phive-thriftserver package
>>
>> [INFO] Spark Project Parent POM .. SUCCESS
>> [4.124s]
>> [INFO] Spark Launcher Project  SUCCESS
>> [9.001s]
>> [INFO] Spark Project Networking .. SUCCESS
>> [7.871s]
>> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS
>> [3.904s]
>> [INFO] Spark Project Unsafe .. SUCCESS
>> [3.095s]
>> [INFO] Spark Project Core  SUCCESS
>> [24.768s]
>> [INFO] Spark Project Bagel ... SUCCESS
>> [2.029s]
>> [INFO] Spark Project GraphX .. SUCCESS
>> [4.057s]
>> [INFO] Spark Project Streaming ... SUCCESS
>> [9.774s]
>> [INFO] Spark Project Catalyst  SUCCESS
>> [6.804s]
>> [INFO] Spark Project SQL . SUCCESS
>> [9.606s]
>> [INFO] Spark Project ML Library .. SUCCESS
>> [10.872s]
>> [INFO] Spark Project Tools ... SUCCESS
>> [0.627s]
>> [INFO] Spark Project Hive  SUCCESS
>> [13.463s]
>> [INFO] Spark Project REPL  SUCCESS
>> [1.414s]
>> [INFO] Spark Project YARN  SUCCESS
>> [2.433s]
>> [INFO] Spark Project Hive Thrift Server .. FAILURE
>> [8.097s]
>>
>>
>> [ERROR]
>> /spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:25:
>> object ConsoleReader is not a member of package jline
>> [ERROR] import jline.{ConsoleReader, History}
>> [ERROR]^
>> [WARNING] Class jline.Completor not found - continuing with a stub.
>> [WARNING] Class jline.ConsoleReader not found - continuing with a stub.
>> [ERROR]
>> /spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:171:
>> not found: type ConsoleReader
>> [ERROR] val reader = new ConsoleReader()
>> [ERROR]  ^
>> [ERROR] Class jline.Completor not found - continuing with a stub.
>>
>
>


Re: Anyone feels sparkSQL in spark1.5.1 very slow?

2015-10-26 Thread Yin Huai
@filthysocks, can you get the output of jmap -histo before the OOM (
http://docs.oracle.com/javase/7/docs/technotes/tools/share/jmap.html)?

On Mon, Oct 26, 2015 at 6:35 AM, filthysocks  wrote:

> We upgrade from 1.4.1 to 1.5 and it's a pain
> see
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-driver-memory-problems-while-doing-Cross-Validation-do-not-occur-with-1-4-1-td25076.html
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-feels-sparkSQL-in-spark1-5-1-very-slow-tp25154p25204.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Jinfeng Li
Hi, yes, it should be the same issue, but the solution doesn't apply in our
situation. Anyway, thanks a lot for your replies.

On Mon, Oct 26, 2015 at 7:44 PM Sean Owen  wrote:

> Hm, now I wonder if it's the same issue here:
> https://issues.apache.org/jira/browse/SPARK-10149
>
> Does the setting described there help?
>
> On Mon, Oct 26, 2015 at 11:39 AM, Jinfeng Li  wrote:
>
>> Hi, I have already tried the same code with Spark 1.3.1, there is no such
>> problem. The configuration files are all directly copied from Spark 1.5.1.
>> I feel it is a bug on Spark 1.5.1.
>>
>> Thanks a lot for your response.
>>
>> On Mon, Oct 26, 2015 at 7:21 PM Sean Owen  wrote:
>>
>>> Yeah, are these stats actually reflecting data read locally, like
>>> through the loopback interface? I'm also no expert on the internals here
>>> but this may be measuring effectively local reads. Or are you sure it's not?
>>>
>>> On Mon, Oct 26, 2015 at 11:14 AM, Steve Loughran >> > wrote:
>>>

 > On 26 Oct 2015, at 09:28, Jinfeng Li  wrote:
 >
 > Replication factor is 3 and we have 18 data nodes. We check HDFS
 webUI, data is evenly distributed among 18 machines.
 >


 every block in HDFS (usually 64-128-256 MB) is distributed across three
 machines, meaning 3 machines have it local, 15 have it remote.

 for data locality to work properly, you need the executors to be
 reading in the blocks of data local to them, and not data from other parts
 of the files. Spark does try to do locality, but if there's only a limited
 set of executors, then more of the workload is remote vs local.

 I don't know of an obvious way to get the metrics here of local vs
 remote; I don't see the HDFS client library tracking that —though it should
 be the place to collect stats on local/remote/domain-socket-direct IO. Does
 anyone know somewhere in the Spark metrics which tracks placement locality?
 If not, both layers could have some more metrics added.
>>>
>>>
>>>
>


Error Compiling Spark 1.4.1 w/ Scala 2.11 & Hive Support

2015-10-26 Thread Bryan Jeffrey
All,

I'm seeing the following error compiling Spark 1.4.1 w/ Scala 2.11 & Hive
support. Any ideas?

mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
-Phive-thriftserver package

[INFO] Spark Project Parent POM .. SUCCESS [4.124s]
[INFO] Spark Launcher Project  SUCCESS [9.001s]
[INFO] Spark Project Networking .. SUCCESS [7.871s]
[INFO] Spark Project Shuffle Streaming Service ... SUCCESS [3.904s]
[INFO] Spark Project Unsafe .. SUCCESS [3.095s]
[INFO] Spark Project Core  SUCCESS [24.768s]
[INFO] Spark Project Bagel ... SUCCESS [2.029s]
[INFO] Spark Project GraphX .. SUCCESS [4.057s]
[INFO] Spark Project Streaming ... SUCCESS [9.774s]
[INFO] Spark Project Catalyst  SUCCESS [6.804s]
[INFO] Spark Project SQL . SUCCESS [9.606s]
[INFO] Spark Project ML Library .. SUCCESS [10.872s]
[INFO] Spark Project Tools ... SUCCESS [0.627s]
[INFO] Spark Project Hive  SUCCESS [13.463s]
[INFO] Spark Project REPL  SUCCESS [1.414s]
[INFO] Spark Project YARN  SUCCESS [2.433s]
[INFO] Spark Project Hive Thrift Server .. FAILURE [8.097s]


[ERROR]
/spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:25:
object ConsoleReader is not a member of package jline
[ERROR] import jline.{ConsoleReader, History}
[ERROR]^
[WARNING] Class jline.Completor not found - continuing with a stub.
[WARNING] Class jline.ConsoleReader not found - continuing with a stub.
[ERROR]
/spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:171:
not found: type ConsoleReader
[ERROR] val reader = new ConsoleReader()
[ERROR]  ^
[ERROR] Class jline.Completor not found - continuing with a stub.


Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-26 Thread Jerry Lam
Hi Fengdong,

Why it needs more memory at the driver side when there are many partitions? It 
seems the implementation can only support use cases for a dozen of partition 
when it is over 100, it fails apart. It is also quite slow to initialize the 
loading of partition tables when the number of partition is over 100. 

Best Regards,

Jerry

Sent from my iPhone

> On 26 Oct, 2015, at 2:50 am, Fengdong Yu  wrote:
> 
> How many partitions you generated?
> if Millions generated, then there is a huge memory consumed.
> 
> 
> 
> 
> 
>> On Oct 26, 2015, at 10:58 AM, Jerry Lam  wrote:
>> 
>> Hi guys,
>> 
>> I mentioned that the partitions are generated so I tried to read the 
>> partition data from it. The driver is OOM after few minutes. The stack trace 
>> is below. It looks very similar to the the jstack above (note on the refresh 
>> method). Thanks!
>> 
>> Name: java.lang.OutOfMemoryError
>> Message: GC overhead limit exceeded
>> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
>> java.lang.StringBuilder.append(StringBuilder.java:132)
>> org.apache.hadoop.fs.Path.toString(Path.java:384)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
>> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> scala.Option.getOrElse(Option.scala:120)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
>> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:31)
>> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
>> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
>> 
>>> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam  wrote:
>>> Hi Josh,
>>> 
>>> No I don't have speculation enabled. The driver took about few hours until 
>>> it was OOM. Interestingly, all partitions are generated successfully 
>>> (_SUCCESS file is written in the output directory). Is there a reason why 
>>> the driver needs so much memory? The jstack revealed that it called refresh 
>>> some file statuses. Is there a way to avoid OutputCommitCoordinator to use 
>>> so much memory? 
>>> 
>>> Ultimately, I choose to use partitions because most of the queries I have 
>>> will execute based the partition field. For example, "SELECT events from 
>>> customer where customer_id = 1234". If the partition is based on 
>>> customer_id, all events for a customer can be easily retrieved without 
>>> filtering the entire dataset which is much more efficient (I hope). 
>>> However, I notice that the implementation of the partition logic does not 
>>> seem to allow this type of use cases without using a lot of memory which 

Spark 1.5.1 hadoop 2.4 does not clear hive staging files after job finishes

2015-10-26 Thread unk1102
Hi I have spark job which creates hive table partitions I have switched to 
in spark 1.5.1 and spark 1.5.1 creates so many hive staging files and it
doesn't delete it after job finishes. Is it a bug or do I need to disable
something to prevents hive staging files from getting created or at least
delete it. Hive staging files looks like the following 

.hive-staging_hive_blabla



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-hadoop-2-4-does-not-clear-hive-staging-files-after-job-finishes-tp25203.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading Files from HDFS Incurs Network Communication

2015-10-26 Thread Steve Loughran

On 26 Oct 2015, at 11:21, Sean Owen 
> wrote:

Yeah, are these stats actually reflecting data read locally, like through the 
loopback interface? I'm also no expert on the internals here but this may be 
measuring effectively local reads. Or are you sure it's not?


HDFS stats are really the general filesystem stats: they measure data through 
the input and output streams, not whether they were to/from local or remote 
systems. Fixable, and metrics are always good, though as Hadoop (currently) 
uses Hadoop metrics 2, not the codahale APIs, it's not seamless to glue it up 
with the spark context metric registry

On Mon, Oct 26, 2015 at 11:14 AM, Steve Loughran 
> wrote:

> On 26 Oct 2015, at 09:28, Jinfeng Li 
> > wrote:
>
> Replication factor is 3 and we have 18 data nodes. We check HDFS webUI, data 
> is evenly distributed among 18 machines.
>


every block in HDFS (usually 64-128-256 MB) is distributed across three 
machines, meaning 3 machines have it local, 15 have it remote.

for data locality to work properly, you need the executors to be reading in the 
blocks of data local to them, and not data from other parts of the files. Spark 
does try to do locality, but if there's only a limited set of executors, then 
more of the workload is remote vs local.

I don't know of an obvious way to get the metrics here of local vs remote; I 
don't see the HDFS client library tracking that —though it should be the place 
to collect stats on local/remote/domain-socket-direct IO. Does anyone know 
somewhere in the Spark metrics which tracks placement locality? If not, both 
layers could have some more metrics added.




Re: Error Compiling Spark 1.4.1 w/ Scala 2.11 & Hive Support

2015-10-26 Thread Ted Yu
Scala 2.11 is supported in 1.5.1 release:

http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-parent_2.11%22

Can you upgrade ?

Cheers

On Mon, Oct 26, 2015 at 6:01 AM, Bryan Jeffrey 
wrote:

> All,
>
> I'm seeing the following error compiling Spark 1.4.1 w/ Scala 2.11 & Hive
> support. Any ideas?
>
> mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
> -Phive-thriftserver package
>
> [INFO] Spark Project Parent POM .. SUCCESS [4.124s]
> [INFO] Spark Launcher Project  SUCCESS [9.001s]
> [INFO] Spark Project Networking .. SUCCESS [7.871s]
> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [3.904s]
> [INFO] Spark Project Unsafe .. SUCCESS [3.095s]
> [INFO] Spark Project Core  SUCCESS
> [24.768s]
> [INFO] Spark Project Bagel ... SUCCESS [2.029s]
> [INFO] Spark Project GraphX .. SUCCESS [4.057s]
> [INFO] Spark Project Streaming ... SUCCESS [9.774s]
> [INFO] Spark Project Catalyst  SUCCESS [6.804s]
> [INFO] Spark Project SQL . SUCCESS [9.606s]
> [INFO] Spark Project ML Library .. SUCCESS
> [10.872s]
> [INFO] Spark Project Tools ... SUCCESS [0.627s]
> [INFO] Spark Project Hive  SUCCESS
> [13.463s]
> [INFO] Spark Project REPL  SUCCESS [1.414s]
> [INFO] Spark Project YARN  SUCCESS [2.433s]
> [INFO] Spark Project Hive Thrift Server .. FAILURE [8.097s]
>
>
> [ERROR]
> /spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:25:
> object ConsoleReader is not a member of package jline
> [ERROR] import jline.{ConsoleReader, History}
> [ERROR]^
> [WARNING] Class jline.Completor not found - continuing with a stub.
> [WARNING] Class jline.ConsoleReader not found - continuing with a stub.
> [ERROR]
> /spark/spark-1.4.1.hive.bak/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:171:
> not found: type ConsoleReader
> [ERROR] val reader = new ConsoleReader()
> [ERROR]  ^
> [ERROR] Class jline.Completor not found - continuing with a stub.
>


Re: Maven build failed (Spark master)

2015-10-26 Thread Kayode Odeyemi
Hi,

The ./make_distribution task completed. However, I can't seem to locate the
.tar.gz file.

Where does Spark save this? or should I just work with the dist directory?

On Fri, Oct 23, 2015 at 4:23 PM, Kayode Odeyemi  wrote:

> I saw this when I tested manually (without ./make-distribution)
>
> Detected Maven Version: 3.2.2 is not in the allowed range 3.3.3.
>
> So I simply upgraded maven to 3.3.3.
>
> Resolved. Thanks
>
> On Fri, Oct 23, 2015 at 3:17 PM, Sean Owen  wrote:
>
>> This doesn't show the actual error output from Maven. I have a strong
>> guess that you haven't set MAVEN_OPTS to increase the memory Maven can
>> use.
>>
>> On Fri, Oct 23, 2015 at 6:14 AM, Kayode Odeyemi 
>> wrote:
>> > Hi,
>> >
>> > I can't seem to get a successful maven build. Please see command output
>> > below:
>> >
>> > bash-3.2$ ./make-distribution.sh --name spark-latest --tgz --mvn mvn
>> > -Dhadoop.version=2.7.0 -Phadoop-2.7 -Phive -Phive-thriftserver
>> -DskipTests
>> > clean package
>> > +++ dirname ./make-distribution.sh
>> > ++ cd .
>> > ++ pwd
>> > + SPARK_HOME=/usr/local/spark-latest
>> > + DISTDIR=/usr/local/spark-latest/dist
>> > + SPARK_TACHYON=false
>> > + TACHYON_VERSION=0.7.1
>> > + TACHYON_TGZ=tachyon-0.7.1-bin.tar.gz
>> > +
>> > TACHYON_URL=
>> https://github.com/amplab/tachyon/releases/download/v0.7.1/tachyon-0.7.1-bin.tar.gz
>> > + MAKE_TGZ=false
>> > + NAME=none
>> > + MVN=/usr/local/spark-latest/build/mvn
>> > + ((  12  ))
>> > + case $1 in
>> > + NAME=spark-latest
>> > + shift
>> > + shift
>> > + ((  10  ))
>> > + case $1 in
>> > + MAKE_TGZ=true
>> > + shift
>> > + ((  9  ))
>> > + case $1 in
>> > + MVN=mvn
>> > + shift
>> > + shift
>> > + ((  7  ))
>> > + case $1 in
>> > + break
>> > + '[' -z
>> /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
>> > + '[' -z
>> /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
>> > ++ command -v git
>> > + '[' /usr/bin/git ']'
>> > ++ git rev-parse --short HEAD
>> > + GITREV=487d409
>> > + '[' '!' -z 487d409 ']'
>> > + GITREVSTRING=' (git revision 487d409)'
>> > + unset GITREV
>> > ++ command -v mvn
>> > + '[' '!' /usr/bin/mvn ']'
>> > ++ mvn help:evaluate -Dexpression=project.version -Dhadoop.version=2.7.0
>> > -Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests clean package
>> > ++ grep -v INFO
>> > ++ tail -n 1
>> > + VERSION='[ERROR] [Help 1]
>> > http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
>> '
>> >
>> > Same output error with JDK 7
>> >
>> > Appreciate your help.
>> >
>> >
>>
>
>
>


HiveContext ignores ("skip.header.line.count"="1")

2015-10-26 Thread Daniel Haviv
Hi,
I have a csv table in Hive which is configured to skip the header row using
TBLPROPERTIES("skip.header.line.count"="1").
When querying from Hive the header row is not included in the data, but
when running the same query via HiveContext I get the header row.

I made sure that HiveContext sees the skip.header.line.count setting by
running "show create table"

Any ideas?

Thank you.
Daniel


Re: Anyone feels sparkSQL in spark1.5.1 very slow?

2015-10-26 Thread filthysocks
We upgrade from 1.4.1 to 1.5 and it's a pain
see
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-driver-memory-problems-while-doing-Cross-Validation-do-not-occur-with-1-4-1-td25076.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-feels-sparkSQL-in-spark1-5-1-very-slow-tp25154p25204.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Maven build failed (Spark master)

2015-10-26 Thread Yana Kadiyska
In 1.4 ./make_distribution produces a .tgz file in the root directory (same
directory that make_distribution is in)



On Mon, Oct 26, 2015 at 8:46 AM, Kayode Odeyemi  wrote:

> Hi,
>
> The ./make_distribution task completed. However, I can't seem to locate the
> .tar.gz file.
>
> Where does Spark save this? or should I just work with the dist directory?
>
> On Fri, Oct 23, 2015 at 4:23 PM, Kayode Odeyemi  wrote:
>
>> I saw this when I tested manually (without ./make-distribution)
>>
>> Detected Maven Version: 3.2.2 is not in the allowed range 3.3.3.
>>
>> So I simply upgraded maven to 3.3.3.
>>
>> Resolved. Thanks
>>
>> On Fri, Oct 23, 2015 at 3:17 PM, Sean Owen  wrote:
>>
>>> This doesn't show the actual error output from Maven. I have a strong
>>> guess that you haven't set MAVEN_OPTS to increase the memory Maven can
>>> use.
>>>
>>> On Fri, Oct 23, 2015 at 6:14 AM, Kayode Odeyemi 
>>> wrote:
>>> > Hi,
>>> >
>>> > I can't seem to get a successful maven build. Please see command output
>>> > below:
>>> >
>>> > bash-3.2$ ./make-distribution.sh --name spark-latest --tgz --mvn mvn
>>> > -Dhadoop.version=2.7.0 -Phadoop-2.7 -Phive -Phive-thriftserver
>>> -DskipTests
>>> > clean package
>>> > +++ dirname ./make-distribution.sh
>>> > ++ cd .
>>> > ++ pwd
>>> > + SPARK_HOME=/usr/local/spark-latest
>>> > + DISTDIR=/usr/local/spark-latest/dist
>>> > + SPARK_TACHYON=false
>>> > + TACHYON_VERSION=0.7.1
>>> > + TACHYON_TGZ=tachyon-0.7.1-bin.tar.gz
>>> > +
>>> > TACHYON_URL=
>>> https://github.com/amplab/tachyon/releases/download/v0.7.1/tachyon-0.7.1-bin.tar.gz
>>> > + MAKE_TGZ=false
>>> > + NAME=none
>>> > + MVN=/usr/local/spark-latest/build/mvn
>>> > + ((  12  ))
>>> > + case $1 in
>>> > + NAME=spark-latest
>>> > + shift
>>> > + shift
>>> > + ((  10  ))
>>> > + case $1 in
>>> > + MAKE_TGZ=true
>>> > + shift
>>> > + ((  9  ))
>>> > + case $1 in
>>> > + MVN=mvn
>>> > + shift
>>> > + shift
>>> > + ((  7  ))
>>> > + case $1 in
>>> > + break
>>> > + '[' -z
>>> /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
>>> > + '[' -z
>>> /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
>>> > ++ command -v git
>>> > + '[' /usr/bin/git ']'
>>> > ++ git rev-parse --short HEAD
>>> > + GITREV=487d409
>>> > + '[' '!' -z 487d409 ']'
>>> > + GITREVSTRING=' (git revision 487d409)'
>>> > + unset GITREV
>>> > ++ command -v mvn
>>> > + '[' '!' /usr/bin/mvn ']'
>>> > ++ mvn help:evaluate -Dexpression=project.version
>>> -Dhadoop.version=2.7.0
>>> > -Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests clean package
>>> > ++ grep -v INFO
>>> > ++ tail -n 1
>>> > + VERSION='[ERROR] [Help 1]
>>> >
>>> http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException'
>>> >
>>> > Same output error with JDK 7
>>> >
>>> > Appreciate your help.
>>> >
>>> >
>>>
>>
>>
>>
>


Re: Spark scala REPL - Unable to create sqlContext

2015-10-26 Thread Richard Hillegas

Note that embedded Derby supports multiple, simultaneous connections, that
is, multiple simultaneous users. But a Derby database is owned by the
process which boots it. Only one process can boot a Derby database at a
given time. The creation of multiple SQL contexts must be spawning multiple
attempts to boot and own the database. If multiple different processes want
to access the same Derby database simultaneously, then the database should
be booted by the Derby network server. After that, the processes which want
to access the database simultaneously can use the Derby network client
driver, not the Derby embedded driver. For more information, see the Derby
Server and Administration Guide:
http://db.apache.org/derby/docs/10.12/adminguide/index.html

Thanks,
Rick Hillegas



Deenar Toraskar  wrote on 10/25/2015 11:29:54
PM:

> From: Deenar Toraskar 
> To: "Ge, Yao (Y.)" 
> Cc: Ted Yu , user 
> Date: 10/25/2015 11:30 PM
> Subject: Re: Spark scala REPL - Unable to create sqlContext
>
> Embedded Derby, which Hive/Spark SQL uses as the default metastore
> only supports a single user at a time. Till this issue is fixed, you
> could use another metastore that supports multiple concurrent users
> (e.g. networked derby or mysql) to get around it.
>
> On 25 October 2015 at 16:15, Ge, Yao (Y.)  wrote:
> Thanks. I wonder why this is not widely reported in the user forum.
> The RELP shell is basically broken in 1.5 .0 and 1.5.1
> -Yao
>
> From: Ted Yu [mailto:yuzhih...@gmail.com]
> Sent: Sunday, October 25, 2015 12:01 PM
> To: Ge, Yao (Y.)
> Cc: user
> Subject: Re: Spark scala REPL - Unable to create sqlContext
>
> Have you taken a look at the fix for SPARK-11000 which is in the
> upcoming 1.6.0 release ?
>
> Cheers
>
> On Sun, Oct 25, 2015 at 8:42 AM, Yao  wrote:
> I have not been able to start Spark scala shell since 1.5 as it was not
able
> to create the sqlContext during the startup. It complains the
metastore_db
> is already locked: "Another instance of Derby may have already booted the
> database". The Derby log is attached.
>
> I only have this problem with starting the shell in yarn-client mode. I
am
> working with HDP2.2.6 which runs Hadoop 2.6.
>
> -Yao derby.log
>

>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-scala-REPL-Unable-to-create-sqlContext-
> tp25195.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

correct and fast way to stop streaming application

2015-10-26 Thread Krot Viacheslav
Hi all,

I wonder what is the correct way to stop streaming application if some job
failed?
What I have now:

val ssc = new StreamingContext

ssc.start()
try {
   ssc.awaitTermination()
} catch {
   case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
}

It works but one problem still exists - after job failed and before
streaming context is stopped it manages to start job for next batch. That
is not desirable for me.
It works like this because JobScheduler is an actor and after it reports
error, it goes on with next message that starts next batch job. While
ssc.awaitTermination() works in another thread and happens after next batch
starts.

Is there a way to stop before next job is submitted?


RE: Problem with make-distribution.sh

2015-10-26 Thread java8964
Maybe you need the Hive part?
Yong

Date: Mon, 26 Oct 2015 11:34:30 -0400
Subject: Problem with make-distribution.sh
From: yana.kadiy...@gmail.com
To: user@spark.apache.org

Hi folks, 
building spark instructions 
(http://spark.apache.org/docs/latest/building-spark.html) suggest that 

./make-distribution.sh --name custom-spark --tgz -Phadoop-2.4 -Pyarn

should produce a distribution similar to the ones found on the "Downloads" page.
I noticed that the tgz I built using the above command does not produce the 
datanucleus jars which are included in the "boxed" spark distributions. What is 
the best-practice advice here?
I would like my distribution to match the official one as closely as possible.
Thanks

Re: Problem with make-distribution.sh

2015-10-26 Thread Yana Kadiyska
thank you so much! You are correct. This is the second time I've made this
mistake :(

On Mon, Oct 26, 2015 at 11:36 AM, java8964  wrote:

> Maybe you need the Hive part?
>
> Yong
>
> --
> Date: Mon, 26 Oct 2015 11:34:30 -0400
> Subject: Problem with make-distribution.sh
> From: yana.kadiy...@gmail.com
> To: user@spark.apache.org
>
>
> Hi folks,
>
> building spark instructions (
> http://spark.apache.org/docs/latest/building-spark.html) suggest that
>
>
> ./make-distribution.sh --name custom-spark --tgz -Phadoop-2.4 -Pyarn
>
>
>
> should produce a distribution similar to the ones found on the "Downloads"
> page.
>
> I noticed that the tgz I built using the above command does not produce
> the datanucleus jars which are included in the "boxed" spark distributions.
> What is the best-practice advice here?
>
> I would like my distribution to match the official one as closely as
> possible.
>
> Thanks
>


Problem with make-distribution.sh

2015-10-26 Thread Yana Kadiyska
Hi folks,

building spark instructions (
http://spark.apache.org/docs/latest/building-spark.html) suggest that


./make-distribution.sh --name custom-spark --tgz -Phadoop-2.4 -Pyarn



should produce a distribution similar to the ones found on the "Downloads"
page.

I noticed that the tgz I built using the above command does not produce the
datanucleus jars which are included in the "boxed" spark distributions.
What is the best-practice advice here?

I would like my distribution to match the official one as closely as
possible.

Thanks


Spark Streaming: how to use StreamingContext.queueStream with existing RDD

2015-10-26 Thread Anfernee Xu
Hi,

Here's my situation, I have some kind of offline dataset and got them
loaded them into Spark as RDD, but I want to form a virtual data stream
feeding to Spark Streaming, my code looks like this


   // sort offline data by time, the dataset spans 2 hours
 1)  JavaRDD sortedByTime = offlineDataRDD.sortBy( );

   // compute a list of JavaRDD,  each element JavaRDD is hosting the data
in the same time
   // bucket, for example 5 minutes
  2) List virtualStreamRdd = ?

Queue queue = Queues.newLinkedBlockingQueue();
queue.addAll(virtualStreamRdd);

/*
 * Create DStream from the queue
 */

3) final JavaDStream rowDStream =
streamingContext.queueStream(queue);


Currently I'm stucking in 2), any suggestion is appreciated.

Thanks

-- 
--Anfernee


Re: correct and fast way to stop streaming application

2015-10-26 Thread varun sharma
+1, wanted to do same.

On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav 
wrote:

> Hi all,
>
> I wonder what is the correct way to stop streaming application if some job
> failed?
> What I have now:
>
> val ssc = new StreamingContext
> 
> ssc.start()
> try {
>ssc.awaitTermination()
> } catch {
>case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
> }
>
> It works but one problem still exists - after job failed and before
> streaming context is stopped it manages to start job for next batch. That
> is not desirable for me.
> It works like this because JobScheduler is an actor and after it reports
> error, it goes on with next message that starts next batch job. While
> ssc.awaitTermination() works in another thread and happens after next batch
> starts.
>
> Is there a way to stop before next job is submitted?
>



-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Problem with make-distribution.sh

2015-10-26 Thread Sean Owen
I don't think the page suggests that gives you any of the tarballs on the
downloads page, and -Phive does not by itself do so either.

On Mon, Oct 26, 2015 at 4:58 PM, Ted Yu  wrote:

> I logged SPARK-11318 with a PR.
>
> I verified that by adding -Phive the datanucleus jars are included:
>
> tar tzvf spark-1.6.0-SNAPSHOT-bin-custom-spark.tgz | grep datanucleus
> -rw-r--r-- hbase/hadoop 1890075 2015-10-26 09:52
> spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-core-3.2.10.jar
> -rw-r--r-- hbase/hadoop339666 2015-10-26 09:52
> spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-api-jdo-3.2.6.jar
> -rw-r--r-- hbase/hadoop   1809447 2015-10-26 09:52
> spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-rdbms-3.2.9.jar
>
> Cheers
>
> On Mon, Oct 26, 2015 at 8:52 AM, Yana Kadiyska 
> wrote:
>
>> thank you so much! You are correct. This is the second time I've made
>> this mistake :(
>>
>> On Mon, Oct 26, 2015 at 11:36 AM, java8964  wrote:
>>
>>> Maybe you need the Hive part?
>>>
>>> Yong
>>>
>>> --
>>> Date: Mon, 26 Oct 2015 11:34:30 -0400
>>> Subject: Problem with make-distribution.sh
>>> From: yana.kadiy...@gmail.com
>>> To: user@spark.apache.org
>>>
>>>
>>> Hi folks,
>>>
>>> building spark instructions (
>>> http://spark.apache.org/docs/latest/building-spark.html) suggest that
>>>
>>>
>>> ./make-distribution.sh --name custom-spark --tgz -Phadoop-2.4 -Pyarn
>>>
>>>
>>>
>>> should produce a distribution similar to the ones found on the
>>> "Downloads" page.
>>>
>>> I noticed that the tgz I built using the above command does not produce
>>> the datanucleus jars which are included in the "boxed" spark distributions.
>>> What is the best-practice advice here?
>>>
>>> I would like my distribution to match the official one as closely as
>>> possible.
>>>
>>> Thanks
>>>
>>
>>
>


Re: Spark Streaming: how to use StreamingContext.queueStream with existing RDD

2015-10-26 Thread Dean Wampler
Check out StreamingContext.queueStream (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext
)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Oct 26, 2015 at 11:16 AM, Anfernee Xu  wrote:

> Hi,
>
> Here's my situation, I have some kind of offline dataset and got them
> loaded them into Spark as RDD, but I want to form a virtual data stream
> feeding to Spark Streaming, my code looks like this
>
>
>// sort offline data by time, the dataset spans 2 hours
>  1)  JavaRDD sortedByTime = offlineDataRDD.sortBy( );
>
>// compute a list of JavaRDD,  each element JavaRDD is hosting the data
> in the same time
>// bucket, for example 5 minutes
>   2) List virtualStreamRdd = ?
>
> Queue queue = Queues.newLinkedBlockingQueue();
> queue.addAll(virtualStreamRdd);
>
> /*
>  * Create DStream from the queue
>  */
>
> 3) final JavaDStream rowDStream =
> streamingContext.queueStream(queue);
>
>
> Currently I'm stucking in 2), any suggestion is appreciated.
>
> Thanks
>
> --
> --Anfernee
>


Re: [Spark Streaming] How do we reset the updateStateByKey values.

2015-10-26 Thread Uthayan Suthakar
Thank you Adrian for your reply. I've already managed to resolve this issue
and coincidently it is similar to the solution that you've proposed.

Cheers,

Uthay.

On 26 October 2015 at 10:41, Adrian Tanase  wrote:

> Have you considered union-ing the 2 streams? Basically you can consider
> them as 2 “message types” that your update function can consume (e.g.
> implement a common interface):
>
>- regularUpdate
>- resetStateUpdate
>
> Inside your updateStateByKey you can check if any of the messages in the
> list of updates is an resetState message. If now, continue summing the
> others.
>
> I can provide scala samples, my java is beyond rusty :)
>
> -adrian
>
> From: Uthayan Suthakar
> Date: Friday, October 23, 2015 at 2:10 PM
> To: Sander van Dijk
> Cc: user
> Subject: Re: [Spark Streaming] How do we reset the updateStateByKey
> values.
>
> Hi Sander,
>
> Thank you for your very informative email. From your email, I've learned a
> quite a bit.
>
> >>>Is the condition determined somehow from the data coming through
> streamLogs, and is newData streamLogs again (rather than a whole data
> source?)
>
> No, they are two different Streams. I have two stream receivers, one of
> which sends event regularly and the other is not so regular (this data is
> computed by another application and stored into HDFS). What I'm trying to
> do is pick up the data from HDFS and overwrite the Stream's state. Hence
> the overwriting should only take place if there were new files in HDFS.
>
> So we have two different RDDs. If no file is found in HDFS, it will simply
> read the regular stream, compute and update the state(1) and output the
> result. If there is a file found in HDFS, then it should overwrite the
> state (1) with the data found from HDFS so the new events from the regular
> stream will carry on with the new overwritten state.
>
> I managed to get most of it done, but only having the issue with
> overwriting the state.
>
>
>
> On 22 October 2015 at 19:35, Sander van Dijk  wrote:
>
>> I don't think it is possible in the way you try to do it. It is important
>> to remember that the statements you mention only set up the stream stages,
>> before the stream is actually running. Once it's running, you cannot
>> change, remove or add stages.
>>
>> I am not sure how you determine your condition and what the actual change
>> should be when that condition is met: you say you want a different update
>> function but then give a statement with the same update function but a
>> different source stream). Is the condition determined somehow from the data
>> coming through streamLogs, and is newData basically streamLogs again
>> (rather than a whole data source?). In that case I can think of 3 things to
>> try:
>>
>> - if the condition you switch on can be determined independently from
>> every item in streamLogs, you can simply do an if/else inside
>> updateResultsStream to change the method that you determine your state
>> - if this is not the case, but you can determine when to switch your
>> condition for each key independently, you can extend your state type to
>> also keep track of your condition: rather than using
>> JavaPairDStream you make updatedResultsState a
>> JavaPairDStream> (assuming you have some
>> class Pair), and you make updateResultsStream update and check the state of
>> the boolean.
>> - finally, you can have a separate state stream that keeps track of your
>> condition globally, then join that with you main stream and use that to
>> update state. Something like:
>>
>> // determineCondition should result in a reduction to a single item that
>> signals whether the condition is met in the current batch,
>> updateContitionState should remember that
>> conditionStateStream =
>> streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)
>>
>>
>> // addCondition gets RDDs from streamLogs and  single-item RDDs with the
>> condition state and should add that state to each item in the streamLogs RDD
>> joinedStream = streamLogs.transformWith(conditionStateStream,
>> addCondition)
>>
>> // This is similar to the extend state type of the previous idea, but now
>> your condition state is determined globally rather than per log entry
>> updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)
>>
>> I hope this applies to your case and that it makes sense, my Java is a
>> bit rusty :) and perhaps others can suggest better spark streaming methods
>> that can be used, but hopefully the idea is clear.
>>
>> Sander
>>
>> On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar <
>> uthayan.sutha...@gmail.com> wrote:
>>
>>> Hello guys,
>>>
>>> I have a stream job that will carryout computations and update the state
>>> (SUM the value). At some point, I would like to reset the state. I could
>>> drop the state by setting 'None' but I don't want to drop it. I would like
>>> to keep the state but update the 

rdd conversion

2015-10-26 Thread Yasemin Kaya
Hi,

I have *JavaRDD>>* and I want to
convert every map to pairrdd, i mean
* JavaPairRDD. *

There is a loop in list to get the indexed map, when I write code below, it
returns me only one rdd.

JavaPairRDD mapToRDD =
 IdMapValues.mapToPair(new
PairFunction>, Integer,
ArrayList>() {

@Override
public Tuple2 call(
List> arg0)
throws Exception {
Tuple2 t = null;
for(int i=0; i entry :arg0.get(i).entrySet()) {
t = new Tuple2  (entry.getKey(),
entry.getValue());
}
}

return t;
}
});

As you can see i am using java. Give me some clue .. Thanks.

Best,
yasemin

-- 
hiç ender hiç


Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-26 Thread Koert Kuipers
it seems HadoopFsRelation keeps track of all part files (instead of just
the data directories). i believe this has something to do with parquet
footers but i didnt bother to look more into it. but yet the result is that
driver side it:
1) tries to keep track of all part files in a Map[Path, FileStatus]
2) it also tries to serialize the paths to all part files (instead of just
the data directories) in the Hadoop JobConf object (or create a JobConf per
part file in case of spark-avro)

i agree this approach is not scalable... i ran into it myself with
spark-avro where a job simply never gets started on a large number of part
files. i am still trying to understand better why all the part files need
to be tracked driver side but i am pretty sure i plan to remove this in our
inhouse spark version.

i also noticed code that actually assumes the schema for every part file
can be different (even within the same partition, which seems unlikely,
except if you use insert i guess), and the code tries to reconcile the
schema between all part files... i also do not think this is scalable.

sorry this became a bit of a rant


On Mon, Oct 26, 2015 at 9:56 AM, Jerry Lam  wrote:

> Hi Fengdong,
>
> Why it needs more memory at the driver side when there are many
> partitions? It seems the implementation can only support use cases for a
> dozen of partition when it is over 100, it fails apart. It is also quite
> slow to initialize the loading of partition tables when the number of
> partition is over 100.
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 26 Oct, 2015, at 2:50 am, Fengdong Yu  wrote:
>
> How many partitions you generated?
> if Millions generated, then there is a huge memory consumed.
>
>
>
>
>
> On Oct 26, 2015, at 10:58 AM, Jerry Lam  wrote:
>
> Hi guys,
>
> I mentioned that the partitions are generated so I tried to read the
> partition data from it. The driver is OOM after few minutes. The stack
> trace is below. It looks very similar to the the jstack above (note on the
> refresh method). Thanks!
>
> Name: java.lang.OutOfMemoryError
> Message: GC overhead limit exceeded
> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> java.lang.StringBuilder.append(StringBuilder.java:132)
> org.apache.hadoop.fs.Path.toString(Path.java:384)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)org.apache.spark.sql.sources.HadoopFsRelation.org
>  
> $apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)org.apache.spark.sql.sources.HadoopFsRelation.org
>  
> $apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org
>  
> $apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org
>  
> $apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> scala.Option.getOrElse(Option.scala:120)
> 

Re: Problem with make-distribution.sh

2015-10-26 Thread Ted Yu
I logged SPARK-11318 with a PR.

I verified that by adding -Phive the datanucleus jars are included:

tar tzvf spark-1.6.0-SNAPSHOT-bin-custom-spark.tgz | grep datanucleus
-rw-r--r-- hbase/hadoop 1890075 2015-10-26 09:52
spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-core-3.2.10.jar
-rw-r--r-- hbase/hadoop339666 2015-10-26 09:52
spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-api-jdo-3.2.6.jar
-rw-r--r-- hbase/hadoop   1809447 2015-10-26 09:52
spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-rdbms-3.2.9.jar

Cheers

On Mon, Oct 26, 2015 at 8:52 AM, Yana Kadiyska 
wrote:

> thank you so much! You are correct. This is the second time I've made this
> mistake :(
>
> On Mon, Oct 26, 2015 at 11:36 AM, java8964  wrote:
>
>> Maybe you need the Hive part?
>>
>> Yong
>>
>> --
>> Date: Mon, 26 Oct 2015 11:34:30 -0400
>> Subject: Problem with make-distribution.sh
>> From: yana.kadiy...@gmail.com
>> To: user@spark.apache.org
>>
>>
>> Hi folks,
>>
>> building spark instructions (
>> http://spark.apache.org/docs/latest/building-spark.html) suggest that
>>
>>
>> ./make-distribution.sh --name custom-spark --tgz -Phadoop-2.4 -Pyarn
>>
>>
>>
>> should produce a distribution similar to the ones found on the
>> "Downloads" page.
>>
>> I noticed that the tgz I built using the above command does not produce
>> the datanucleus jars which are included in the "boxed" spark distributions.
>> What is the best-practice advice here?
>>
>> I would like my distribution to match the official one as closely as
>> possible.
>>
>> Thanks
>>
>
>


Re: rdd conversion

2015-10-26 Thread Ted Yu
bq.  t = new Tuple2  (entry.getKey(),
entry.getValue());

The return statement is outside the loop.
That was why you got one RDD.

On Mon, Oct 26, 2015 at 9:40 AM, Yasemin Kaya  wrote:

> Hi,
>
> I have *JavaRDD>>* and I want to
> convert every map to pairrdd, i mean
> * JavaPairRDD. *
>
> There is a loop in list to get the indexed map, when I write code below,
> it returns me only one rdd.
>
> JavaPairRDD mapToRDD =
>  IdMapValues.mapToPair(new
> PairFunction>, Integer,
> ArrayList>() {
>
> @Override
> public Tuple2 call(
> List> arg0)
> throws Exception {
> Tuple2 t = null;
> for(int i=0; i for (Map.Entry entry :arg0.get(i).entrySet()) {
> t = new Tuple2  (entry.getKey(),
> entry.getValue());
> }
> }
>
> return t;
> }
> });
>
> As you can see i am using java. Give me some clue .. Thanks.
>
> Best,
> yasemin
>
> --
> hiç ender hiç
>


Re: rdd conversion

2015-10-26 Thread Yasemin Kaya
But if I put the return inside loop, method still wants me a return
statement.

2015-10-26 19:09 GMT+02:00 Ted Yu :

> bq.  t = new Tuple2  (entry.getKey(),
> entry.getValue());
>
> The return statement is outside the loop.
> That was why you got one RDD.
>
> On Mon, Oct 26, 2015 at 9:40 AM, Yasemin Kaya  wrote:
>
>> Hi,
>>
>> I have *JavaRDD>>* and I want to
>> convert every map to pairrdd, i mean
>> * JavaPairRDD. *
>>
>> There is a loop in list to get the indexed map, when I write code below,
>> it returns me only one rdd.
>>
>> JavaPairRDD mapToRDD =
>>  IdMapValues.mapToPair(new
>> PairFunction>, Integer,
>> ArrayList>() {
>>
>> @Override
>> public Tuple2 call(
>> List> arg0)
>> throws Exception {
>> Tuple2 t = null;
>> for(int i=0; i> for (Map.Entry entry :arg0.get(i).entrySet())
>> {
>> t = new Tuple2  (entry.getKey(),
>> entry.getValue());
>> }
>> }
>>
>> return t;
>> }
>> });
>>
>> As you can see i am using java. Give me some clue .. Thanks.
>>
>> Best,
>> yasemin
>>
>> --
>> hiç ender hiç
>>
>
>


-- 
hiç ender hiç


Re: Concurrent execution of actions within a driver

2015-10-26 Thread Rishitesh Mishra
Spark executes tasks on an action. An action is broken down to multiple
tasks. Multiple tasks from different actions run either in FIFO or FAIR
mode depending on spark.scheduler.mode.
Of course to get benefit of FAIR scheduling the two actions should be
called by different threads.

On Mon, Oct 26, 2015 at 5:01 PM, Fengdong Yu 
wrote:

> not parallel.
>
> Spark only execute tasks with Action,(‘collect' here)
>
> rdd1.collect  and rdd2.collect are executed sequencely, so Spark execute
> two tasks one by one.
>
>
>
>
> On Oct 26, 2015, at 7:26 PM, praveen S  wrote:
>
> Does spark run different actions of an rdd within a driver in parallel
> also?
>
> Let's say
> class Driver{
>
> val rdd1= sc. textFile("... ")
> val rdd2=sc.textFile("")
> rdd1. collect //Action 1
> rdd2. collect //Action 2
>
> }
>
> Does Spark run Action 1 & 2 run in parallel? ( some kind of a pass through
> the driver code and than start the execution)?
>
> if not than is using threads safe for independent actions/red's?
>
>
>


-- 

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Problem with make-distribution.sh

2015-10-26 Thread Yana Kadiyska
@Sean, here is where I think it's a little misleading (underlining is mine):

Building a Runnable Distribution

To create a *Spark distribution like those distributed by the Spark
Downloads  page*, and that is laid
out so as to be runnable, use make-distribution.sh in the project root
directory. It can be configured with Maven profile settings and so on like
the direct Maven build. Example:


Agreed that "like" doesn't necessarily imply "exactly the same". On the
other hand, if I go to the download page all I select is a hadoop version
and distribution, so it's not super-intuitive that -Phive was used to
produce these. I don't have a strong opinion on whether this should be a
fix to the script or the docs but now that it's bitten me twice I'm very
appreciative of either :)

Thanks

On Mon, Oct 26, 2015 at 1:29 PM, Sean Owen  wrote:

> I don't think the page suggests that gives you any of the tarballs on the
> downloads page, and -Phive does not by itself do so either.
>
> On Mon, Oct 26, 2015 at 4:58 PM, Ted Yu  wrote:
>
>> I logged SPARK-11318 with a PR.
>>
>> I verified that by adding -Phive the datanucleus jars are included:
>>
>> tar tzvf spark-1.6.0-SNAPSHOT-bin-custom-spark.tgz | grep datanucleus
>> -rw-r--r-- hbase/hadoop 1890075 2015-10-26 09:52
>> spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-core-3.2.10.jar
>> -rw-r--r-- hbase/hadoop339666 2015-10-26 09:52
>> spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-api-jdo-3.2.6.jar
>> -rw-r--r-- hbase/hadoop   1809447 2015-10-26 09:52
>> spark-1.6.0-SNAPSHOT-bin-custom-spark/lib/datanucleus-rdbms-3.2.9.jar
>>
>> Cheers
>>
>> On Mon, Oct 26, 2015 at 8:52 AM, Yana Kadiyska 
>> wrote:
>>
>>> thank you so much! You are correct. This is the second time I've made
>>> this mistake :(
>>>
>>> On Mon, Oct 26, 2015 at 11:36 AM, java8964  wrote:
>>>
 Maybe you need the Hive part?

 Yong

 --
 Date: Mon, 26 Oct 2015 11:34:30 -0400
 Subject: Problem with make-distribution.sh
 From: yana.kadiy...@gmail.com
 To: user@spark.apache.org


 Hi folks,

 building spark instructions (
 http://spark.apache.org/docs/latest/building-spark.html) suggest that


 ./make-distribution.sh --name custom-spark --tgz -Phadoop-2.4 -Pyarn



 should produce a distribution similar to the ones found on the
 "Downloads" page.

 I noticed that the tgz I built using the above command does not produce
 the datanucleus jars which are included in the "boxed" spark distributions.
 What is the best-practice advice here?

 I would like my distribution to match the official one as closely as
 possible.

 Thanks

>>>
>>>
>>
>


Broadcast table

2015-10-26 Thread Younes Naguib
Hi all,

I use the thrift server, and I cache a table using "cache table mytab".
Is there any sql to broadcast it too?

Thanks
Younes Naguib
Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com 



Re: Running in cluster mode causes native library linking to fail

2015-10-26 Thread Bernardo Vecchia Stein
Hello guys,

After lots of time trying to make things work, I finally found what was
causing the issue:

I was calling the function from the library inside a map function, which
caused the code inside it to be run in executors instead of the driver.
Since only the driver had loaded the library, the executors would then give
an error. The tricky part is that the same error message was being
replicated in the driver's and the executor's logs, so it led me to believe
it was a global error. Only after testing running stuff *only* on the
driver was that I discovered that everything worked.

For future reference: if you are running into this issue, please check if
you are also loading the library on the executors! In the case of my map
example, the fix was to create a wrapper function that 1) loaded libraries
and then 2) called functions within the library. After that, map things to
this wrapper function. This way, you ensure every executor also loads the
libraries.

I'd like to thank Prajod, Renato and Deenar for the help.

Bernardo

On 15 October 2015 at 03:27,  wrote:

> Forwarding to the group, in case someone else has the same error. Just
> found out that I did not reply to the group in my original reply.
>
>
>
> *From:* Prajod S Vettiyattil (WT01 - BAS)
> *Sent:* 15 October 2015 11:45
> *To:* 'Bernardo Vecchia Stein' 
> *Subject:* RE: Running in cluster mode causes native library linking to
> fail
>
>
>
> Hi,
>
>
>
> Also try the path settings given here:
> http://stackoverflow.com/questions/12279833/videocapture-opencv-2-4-2-error-in-windows/29278322#29278322
>
>
>
> Forgot to add this link in my response earlier:
>
> https://blogs.oracle.com/darcy/entry/purging_ld_library_path
>
> http://www.oracle.com/technetwork/java/javase/jdk7-relnotes-418459.html
>
>
>
> So from java 7, LD_LIBRARY_PATH is ignored. This is for Linux and Solaris.
> And probably for all other Unix derivatives.
>
>
>
> Also check : System.loadLibrary() should be inside a static {  } block.
> Please check for its syntax on the internet. The loadlibrary function has
> to be called during class load time. That is why the static block is
> required.
>
>
>
> What is your ?
>
> 1.  Spark version
>
> 2.  OS type and version
>
> 3.  Library that you are trying to load.
>
>
>
> [I was using OpenCV. Had to go through many trials to get it working
> consistently. Initially, it would work only on dev environment(windows) but
> not on Ubuntu. Its been a few months. There is a stackoverflow answer I
> have given regarding this:
> http://stackoverflow.com/questions/12279833/videocapture-opencv-2-4-2-error-in-windows/29278322#29278322
> ]
>
>
>
> Regards,
>
> Prajod
>
>
>
> *From:* Bernardo Vecchia Stein [mailto:bernardovst...@gmail.com]
> *Sent:* 15 October 2015 00:36
> *To:* Prajod S Vettiyattil (WT01 - BAS) 
> *Subject:* Re: Running in cluster mode causes native library linking to
> fail
>
>
>
> Hello Prajod,
>
> Thanks for your reply! I am also using the standalone cluster manager. I
> do not build the jars in Eclipse and neither use Maven. They are built with
> sbt by hand.
>
> I was setting LD_LIBRARY_PATH and LIBRARY_PATH to point to the paths with
> the library. When I didn't set them and set only PATH instead, spark would
> just not find the libraries (it was another error). I'm not sure what
> version you are using, but it appears I do have to set LD_LIBRARY_PATH in
> order to make things work.
>
> I tried a simpler approach using System.load() with a specific path to the
> library, so I don't have to deal with these paths. However, I still get the
> same error when executing in cluster mode (exactly the same error). Do you
> have any idea why that might be failing?
>
> Thank you again for your attention,
>
> Bernardo
>
>
>
> On 14 October 2015 at 03:30,  wrote:
>
> Hi,
>
>
>
> I have successfully made this working using the “standalone”cluster
> manager. Not tried with Mesos or YARN.
>
>
>
> Which of these cluster managers are you using ?
> https://spark.apache.org/docs/1.1.0/cluster-overview.html
>
> ·Standalone
>  – a simple
> cluster manager included with Spark that makes it easy to set up a cluster.
>
> ·Apache Mesos
>  – a general
> cluster manager that can also run Hadoop MapReduce and service applications.
>
> ·Hadoop YARN
>  – the resource
> manager in Hadoop 2.
>
>
>
> I have run Spark using Scala in cluster mode, using the standalone cluster
> manager. It took a lot of effort. Also I think  that “UnsatisfiedLinkError”
> means that your .so could not be found.
>
>
>
> There are two settings to make this work:
>
> 1.  “native library location” in the Eclipse configuration.(my jar
> for spark_submit () was