Optimize the first map reduce of DStream

2015-03-24 Thread Bin Wang
Hi,

I'm learning Spark and I find there could be some optimize for the current
streaming implementation. Correct me if I'm wrong.

The current streaming implementation put the data of one batch into memory
(as RDD). But it seems not necessary.

For example, if I want to count the lines which contains word "Spark", I
just need to map every line to see if it contains word, then reduce it with
a sum function. After that, this line is no longer useful to keep it in
memory.

That is said, if the DStream only have one map and/or reduce operation on
it. It is not necessary to keep all the batch data in the memory. Something
like a pipeline should be OK.

Is it difficult to implement on top of the current implementation?

Thanks.

---
Bin Wang


Re: Optimize the first map reduce of DStream

2015-03-24 Thread Arush Kharbanda
The block size is configurable and that way I think you can reduce the
block interval, to keep the block in memory only for the limiter interval?
Is that what you are looking for?

On Tue, Mar 24, 2015 at 1:38 PM, Bin Wang  wrote:

> Hi,
>
> I'm learning Spark and I find there could be some optimize for the current
> streaming implementation. Correct me if I'm wrong.
>
> The current streaming implementation put the data of one batch into memory
> (as RDD). But it seems not necessary.
>
> For example, if I want to count the lines which contains word "Spark", I
> just need to map every line to see if it contains word, then reduce it with
> a sum function. After that, this line is no longer useful to keep it in
> memory.
>
> That is said, if the DStream only have one map and/or reduce operation on
> it. It is not necessary to keep all the batch data in the memory. Something
> like a pipeline should be OK.
>
> Is it difficult to implement on top of the current implementation?
>
> Thanks.
>
> ---
> Bin Wang
>



-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Optimize the first map reduce of DStream

2015-03-24 Thread Zoltán Zvara
There is a BlockGenerator on each worker node next to the
ReceiverSupervisorImpl, which generates Blocks out of an ArrayBuffer in
each interval (block_interval). These Blocks are passed to
ReceiverSupervisorImpl, which throws these blocks to into the BlockManager
for storage. BlockInfos are passed to the driver. Mini-batches are created
by the JobGenerator component on the driver each batch_interval. I guess
what you are looking for is provided by a continuous model like Flink's. We
are creating mini-batches to provide fault tolerance.

Zvara Zoltán



mail, hangout, skype: zoltan.zv...@gmail.com

mobile, viber: +36203129543

bank: 10918001-0021-50480008

address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a

elte: HSKSJZ (ZVZOAAI.ELTE)

2015-03-24 11:55 GMT+01:00 Arush Kharbanda :

> The block size is configurable and that way I think you can reduce the
> block interval, to keep the block in memory only for the limiter interval?
> Is that what you are looking for?
>
> On Tue, Mar 24, 2015 at 1:38 PM, Bin Wang  wrote:
>
> > Hi,
> >
> > I'm learning Spark and I find there could be some optimize for the
> current
> > streaming implementation. Correct me if I'm wrong.
> >
> > The current streaming implementation put the data of one batch into
> memory
> > (as RDD). But it seems not necessary.
> >
> > For example, if I want to count the lines which contains word "Spark", I
> > just need to map every line to see if it contains word, then reduce it
> with
> > a sum function. After that, this line is no longer useful to keep it in
> > memory.
> >
> > That is said, if the DStream only have one map and/or reduce operation on
> > it. It is not necessary to keep all the batch data in the memory.
> Something
> > like a pipeline should be OK.
> >
> > Is it difficult to implement on top of the current implementation?
> >
> > Thanks.
> >
> > ---
> > Bin Wang
> >
>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


Any guidance on when to back port and how far?

2015-03-24 Thread Sean Owen
So far, my rule of thumb has been:

- Don't back-port new features or improvements in general, only bug fixes
- Don't back-port minor bug fixes
- Back-port bug fixes that seem important enough to not wait for the
next minor release
- Back-port site doc changes to the release most likely to go out
next, to make it a part of the next site publish

But, how far should back-ports go, in general? If the last minor
release was 1.N, then to branch 1.N surely. Farther back is a question
of expectation for support of past minor releases. Given the pace of
change and time available, I assume there's not much support for
continuing to use release 1.(N-1) and very little for 1.(N-2).

Concretely: does anyone expect a 1.1.2 release ever? a 1.2.2 release?
It'd be good to hear the received wisdom explicitly.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Optimize the first map reduce of DStream

2015-03-24 Thread Bin Wang
I'm not looking for limit the block size.

Here is another example. Say we want to count the lines from the stream in
one hour. In a normal program, we may write it like this:

int sum = 0
while (line = getFromStream()) {
store(line) // store the line into storage instead of memory.
sum++
}

This could be seen as a reduce. The only memory used here is just the
variable named "line", need not store all the lines into memory (if lines
would not use in other places). If we want to provide fault tolerance, we
may just store lines into storage instead of in the memory. Could Spark
streaming work like this way? Dose Flink work like this?





On Tue, Mar 24, 2015 at 7:04 PM Zoltán Zvara  wrote:

> There is a BlockGenerator on each worker node next to the
> ReceiverSupervisorImpl, which generates Blocks out of an ArrayBuffer in
> each interval (block_interval). These Blocks are passed to
> ReceiverSupervisorImpl, which throws these blocks to into the BlockManager
> for storage. BlockInfos are passed to the driver. Mini-batches are created
> by the JobGenerator component on the driver each batch_interval. I guess
> what you are looking for is provided by a continuous model like Flink's. We
> are creating mini-batches to provide fault tolerance.
>
> Zvara Zoltán
>
>
>
> mail, hangout, skype: zoltan.zv...@gmail.com
>
> mobile, viber: +36203129543
>
> bank: 10918001-0021-50480008
>
> address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a
>
> elte: HSKSJZ (ZVZOAAI.ELTE)
>
> 2015-03-24 11:55 GMT+01:00 Arush Kharbanda :
>
>> The block size is configurable and that way I think you can reduce the
>> block interval, to keep the block in memory only for the limiter interval?
>> Is that what you are looking for?
>>
>> On Tue, Mar 24, 2015 at 1:38 PM, Bin Wang  wrote:
>>
>> > Hi,
>> >
>> > I'm learning Spark and I find there could be some optimize for the
>> current
>> > streaming implementation. Correct me if I'm wrong.
>> >
>> > The current streaming implementation put the data of one batch into
>> memory
>> > (as RDD). But it seems not necessary.
>> >
>> > For example, if I want to count the lines which contains word "Spark", I
>> > just need to map every line to see if it contains word, then reduce it
>> with
>> > a sum function. After that, this line is no longer useful to keep it in
>> > memory.
>> >
>> > That is said, if the DStream only have one map and/or reduce operation
>> on
>> > it. It is not necessary to keep all the batch data in the memory.
>> Something
>> > like a pipeline should be OK.
>> >
>> > Is it difficult to implement on top of the current implementation?
>> >
>> > Thanks.
>> >
>> > ---
>> > Bin Wang
>> >
>>
>>
>>
>> --
>>
>> [image: Sigmoid Analytics] 
>>
>> *Arush Kharbanda* || Technical Teamlead
>>
>> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>>
>


Re: Optimize the first map reduce of DStream

2015-03-24 Thread Zoltán Zvara
​AFAIK Spark Streaming can not work in a way like this. Transformations are
made on DStreams, where DStreams are basically hold (time,
allocatedBlocksForBatch) pairs.​ Allocated blocks are allocated by the
JobGenerator, unallocated blocks (infos) are collected by
ReceivedBlockTracker. In Spark Streaming you define transformations and
actions on DStreams. The operators define RDD chains, tasks are created by
spark-core. You manipulate DStreams, not single unit of data. Flink for
example uses a continuous model. It optimizes for memory usage and latency.
Read the Spark Streaming paper and Spark paper for more reference.

Zvara Zoltán



mail, hangout, skype: zoltan.zv...@gmail.com

mobile, viber: +36203129543

bank: 10918001-0021-50480008

address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a

elte: HSKSJZ (ZVZOAAI.ELTE)

2015-03-24 15:03 GMT+01:00 Bin Wang :

> I'm not looking for limit the block size.
>
> Here is another example. Say we want to count the lines from the stream in
> one hour. In a normal program, we may write it like this:
>
> int sum = 0
> while (line = getFromStream()) {
> store(line) // store the line into storage instead of memory.
> sum++
> }
>
> This could be seen as a reduce. The only memory used here is just the
> variable named "line", need not store all the lines into memory (if lines
> would not use in other places). If we want to provide fault tolerance, we
> may just store lines into storage instead of in the memory. Could Spark
> streaming work like this way? Dose Flink work like this?
>
>
>
>
>
> On Tue, Mar 24, 2015 at 7:04 PM Zoltán Zvara 
> wrote:
>
>> There is a BlockGenerator on each worker node next to the
>> ReceiverSupervisorImpl, which generates Blocks out of an ArrayBuffer in
>> each interval (block_interval). These Blocks are passed to
>> ReceiverSupervisorImpl, which throws these blocks to into the BlockManager
>> for storage. BlockInfos are passed to the driver. Mini-batches are created
>> by the JobGenerator component on the driver each batch_interval. I guess
>> what you are looking for is provided by a continuous model like Flink's. We
>> are creating mini-batches to provide fault tolerance.
>>
>> Zvara Zoltán
>>
>>
>>
>> mail, hangout, skype: zoltan.zv...@gmail.com
>>
>> mobile, viber: +36203129543
>>
>> bank: 10918001-0021-50480008
>>
>> address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a
>>
>> elte: HSKSJZ (ZVZOAAI.ELTE)
>>
>> 2015-03-24 11:55 GMT+01:00 Arush Kharbanda :
>>
>>> The block size is configurable and that way I think you can reduce the
>>> block interval, to keep the block in memory only for the limiter
>>> interval?
>>> Is that what you are looking for?
>>>
>>> On Tue, Mar 24, 2015 at 1:38 PM, Bin Wang  wrote:
>>>
>>> > Hi,
>>> >
>>> > I'm learning Spark and I find there could be some optimize for the
>>> current
>>> > streaming implementation. Correct me if I'm wrong.
>>> >
>>> > The current streaming implementation put the data of one batch into
>>> memory
>>> > (as RDD). But it seems not necessary.
>>> >
>>> > For example, if I want to count the lines which contains word "Spark",
>>> I
>>> > just need to map every line to see if it contains word, then reduce it
>>> with
>>> > a sum function. After that, this line is no longer useful to keep it in
>>> > memory.
>>> >
>>> > That is said, if the DStream only have one map and/or reduce operation
>>> on
>>> > it. It is not necessary to keep all the batch data in the memory.
>>> Something
>>> > like a pipeline should be OK.
>>> >
>>> > Is it difficult to implement on top of the current implementation?
>>> >
>>> > Thanks.
>>> >
>>> > ---
>>> > Bin Wang
>>> >
>>>
>>>
>>>
>>> --
>>>
>>> [image: Sigmoid Analytics] 
>>>
>>> *Arush Kharbanda* || Technical Teamlead
>>>
>>> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>>>
>>


Re: hadoop input/output format advanced control

2015-03-24 Thread Imran Rashid
I think this would be a great addition, I totally agree that you need to be
able to set these at a finer context than just the SparkContext.

Just to play devil's advocate, though -- the alternative is for you just
subclass HadoopRDD yourself, or make a totally new RDD, and then you could
expose whatever you need.  Why is this solution better?  IMO the criteria
are:
(a) common operations
(b) error-prone / difficult to implement
(c) non-obvious, but important for performance

I think this case fits (a) & (c), so I think its still worthwhile.  But its
also worth asking whether or not its too difficult for a user to extend
HadoopRDD right now.  There have been several cases in the past week where
we've suggested that a user should read from hdfs themselves (eg., to read
multiple files together in one partition) -- with*out* reusing the code in
HadoopRDD, though they would lose things like the metric tracking &
preferred locations you get from HadoopRDD.  Does HadoopRDD need to some
refactoring to make that easier to do?  Or do we just need a good example?

Imran

(sorry for hijacking your thread, Koert)



On Mon, Mar 23, 2015 at 3:52 PM, Koert Kuipers  wrote:

> see email below. reynold suggested i send it to dev instead of user
>
> -- Forwarded message --
> From: Koert Kuipers 
> Date: Mon, Mar 23, 2015 at 4:36 PM
> Subject: hadoop input/output format advanced control
> To: "u...@spark.apache.org" 
>
>
> currently its pretty hard to control the Hadoop Input/Output formats used
> in Spark. The conventions seems to be to add extra parameters to all
> methods and then somewhere deep inside the code (for example in
> PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into
> settings on the Hadoop Configuration object.
>
> for example for compression i see "codec: Option[Class[_ <:
> CompressionCodec]] = None" added to a bunch of methods.
>
> how scalable is this solution really?
>
> for example i need to read from a hadoop dataset and i dont want the input
> (part) files to get split up. the way to do this is to set
> "mapred.min.split.size". now i dont want to set this at the level of the
> SparkContext (which can be done), since i dont want it to apply to input
> formats in general. i want it to apply to just this one specific input
> dataset i need to read. which leaves me with no options currently. i could
> go add yet another input parameter to all the methods
> (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
> etc.). but that seems ineffective.
>
> why can we not expose a Map[String, String] or some other generic way to
> manipulate settings for hadoop input/output formats? it would require
> adding one more parameter to all methods to deal with hadoop input/output
> formats, but after that its done. one parameter to rule them all
>
> then i could do:
> val x = sc.textFile("/some/path", formatSettings =
> Map("mapred.min.split.size" -> "12345"))
>
> or
> rdd.saveAsTextFile("/some/path, formatSettings =
> Map(mapred.output.compress" -> "true", "mapred.output.compression.codec" ->
> "somecodec"))
>


Re: Spark Executor resources

2015-03-24 Thread Sandy Ryza
Hi Zoltan,

If running on YARN, the YARN NodeManager starts executors.  I don't think
there's a 100% precise way for the Spark executor way to know how many
resources are allotted to it.  It can come close by looking at the Spark
configuration options used to request it (spark.executor.memory and
spark.yarn.executor.memoryOverhead), but it can't necessarily for the
amount that YARN has rounded up if those configuration properties
(yarn.scheduler.minimum-allocation-mb and
yarn.scheduler.increment-allocation-mb) are not present on the node.

-Sandy

-Sandy

On Mon, Mar 23, 2015 at 5:08 PM, Zoltán Zvara 
wrote:

> Let's say I'm an Executor instance in a Spark system. Who started me and
> where, when I run on a worker node supervised by (a) Mesos, (b) YARN? I
> suppose I'm the only one Executor on a worker node for a given framework
> scheduler (driver). If I'm an Executor instance, who is the closest object
> to me who can tell me how many resources do I have on (a) Mesos, (b) YARN?
>
> Thank you for your kind input!
>
> Zvara Zoltán
>
>
>
> mail, hangout, skype: zoltan.zv...@gmail.com
>
> mobile, viber: +36203129543
>
> bank: 10918001-0021-50480008
>
> address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a
>
> elte: HSKSJZ (ZVZOAAI.ELTE)
>


Re: Spark Executor resources

2015-03-24 Thread Zoltán Zvara
Thank you for your response!

I guess the (Spark)AM, who gives the container leash to the NM (along with
the executor JAR and command to run) must know how many CPU or RAM that
container capped, isolated at. There must be a resource vector along the
encrypted container leash if I'm right that describes this. Or maybe is
there a way for the ExecutorBackend to fetch this information directly from
the environment? Then, the ExecutorBackend would be able to hand over this
information to the actual Executor who creates the TaskRunner.

Zvara Zoltán



mail, hangout, skype: zoltan.zv...@gmail.com

mobile, viber: +36203129543

bank: 10918001-0021-50480008

address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a

elte: HSKSJZ (ZVZOAAI.ELTE)

2015-03-24 16:30 GMT+01:00 Sandy Ryza :

> Hi Zoltan,
>
> If running on YARN, the YARN NodeManager starts executors.  I don't think
> there's a 100% precise way for the Spark executor way to know how many
> resources are allotted to it.  It can come close by looking at the Spark
> configuration options used to request it (spark.executor.memory and
> spark.yarn.executor.memoryOverhead), but it can't necessarily for the
> amount that YARN has rounded up if those configuration properties
> (yarn.scheduler.minimum-allocation-mb and
> yarn.scheduler.increment-allocation-mb) are not present on the node.
>
> -Sandy
>
> -Sandy
>
> On Mon, Mar 23, 2015 at 5:08 PM, Zoltán Zvara 
> wrote:
>
>> Let's say I'm an Executor instance in a Spark system. Who started me and
>> where, when I run on a worker node supervised by (a) Mesos, (b) YARN? I
>> suppose I'm the only one Executor on a worker node for a given framework
>> scheduler (driver). If I'm an Executor instance, who is the closest object
>> to me who can tell me how many resources do I have on (a) Mesos, (b) YARN?
>>
>> Thank you for your kind input!
>>
>> Zvara Zoltán
>>
>>
>>
>> mail, hangout, skype: zoltan.zv...@gmail.com
>>
>> mobile, viber: +36203129543
>>
>> bank: 10918001-0021-50480008
>>
>> address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a
>>
>> elte: HSKSJZ (ZVZOAAI.ELTE)
>>
>
>


Re: Spark Executor resources

2015-03-24 Thread Sandy Ryza
That's correct.  What's the reason this information is needed?

-Sandy

On Tue, Mar 24, 2015 at 11:41 AM, Zoltán Zvara 
wrote:

> Thank you for your response!
>
> I guess the (Spark)AM, who gives the container leash to the NM (along with
> the executor JAR and command to run) must know how many CPU or RAM that
> container capped, isolated at. There must be a resource vector along the
> encrypted container leash if I'm right that describes this. Or maybe is
> there a way for the ExecutorBackend to fetch this information directly from
> the environment? Then, the ExecutorBackend would be able to hand over this
> information to the actual Executor who creates the TaskRunner.
>
> Zvara Zoltán
>
>
>
> mail, hangout, skype: zoltan.zv...@gmail.com
>
> mobile, viber: +36203129543
>
> bank: 10918001-0021-50480008
>
> address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a
>
> elte: HSKSJZ (ZVZOAAI.ELTE)
>
> 2015-03-24 16:30 GMT+01:00 Sandy Ryza :
>
>> Hi Zoltan,
>>
>> If running on YARN, the YARN NodeManager starts executors.  I don't think
>> there's a 100% precise way for the Spark executor way to know how many
>> resources are allotted to it.  It can come close by looking at the Spark
>> configuration options used to request it (spark.executor.memory and
>> spark.yarn.executor.memoryOverhead), but it can't necessarily for the
>> amount that YARN has rounded up if those configuration properties
>> (yarn.scheduler.minimum-allocation-mb and
>> yarn.scheduler.increment-allocation-mb) are not present on the node.
>>
>> -Sandy
>>
>> -Sandy
>>
>> On Mon, Mar 23, 2015 at 5:08 PM, Zoltán Zvara 
>> wrote:
>>
>>> Let's say I'm an Executor instance in a Spark system. Who started me and
>>> where, when I run on a worker node supervised by (a) Mesos, (b) YARN? I
>>> suppose I'm the only one Executor on a worker node for a given framework
>>> scheduler (driver). If I'm an Executor instance, who is the closest
>>> object
>>> to me who can tell me how many resources do I have on (a) Mesos, (b)
>>> YARN?
>>>
>>> Thank you for your kind input!
>>>
>>> Zvara Zoltán
>>>
>>>
>>>
>>> mail, hangout, skype: zoltan.zv...@gmail.com
>>>
>>> mobile, viber: +36203129543
>>>
>>> bank: 10918001-0021-50480008
>>>
>>> address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a
>>>
>>> elte: HSKSJZ (ZVZOAAI.ELTE)
>>>
>>
>>
>


Re: Spark Executor resources

2015-03-24 Thread Zoltán Zvara
I'm trying to log Tasks to understand physical plan and to visualize which
RDD's which partition is currently computed from which creation site along
with other information. I want to charge the TaskRunner to do this before
actually invoking runTask() on Task and again just before giving the Task
to the GC when metrics are collected. Along with the information I wish to
log, I want to report, log the resources the Executor allocates to run its
Tasks.

Zvara Zoltán



mail, hangout, skype: zoltan.zv...@gmail.com

mobile, viber: +36203129543

bank: 10918001-0021-50480008

address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a

elte: HSKSJZ (ZVZOAAI.ELTE)

2015-03-24 16:42 GMT+01:00 Sandy Ryza :

> That's correct.  What's the reason this information is needed?
>
> -Sandy
>
> On Tue, Mar 24, 2015 at 11:41 AM, Zoltán Zvara 
> wrote:
>
>> Thank you for your response!
>>
>> I guess the (Spark)AM, who gives the container leash to the NM (along
>> with the executor JAR and command to run) must know how many CPU or RAM
>> that container capped, isolated at. There must be a resource vector along
>> the encrypted container leash if I'm right that describes this. Or maybe is
>> there a way for the ExecutorBackend to fetch this information directly from
>> the environment? Then, the ExecutorBackend would be able to hand over this
>> information to the actual Executor who creates the TaskRunner.
>>
>> Zvara Zoltán
>>
>>
>>
>> mail, hangout, skype: zoltan.zv...@gmail.com
>>
>> mobile, viber: +36203129543
>>
>> bank: 10918001-0021-50480008
>>
>> address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a
>>
>> elte: HSKSJZ (ZVZOAAI.ELTE)
>>
>> 2015-03-24 16:30 GMT+01:00 Sandy Ryza :
>>
>>> Hi Zoltan,
>>>
>>> If running on YARN, the YARN NodeManager starts executors.  I don't
>>> think there's a 100% precise way for the Spark executor way to know how
>>> many resources are allotted to it.  It can come close by looking at the
>>> Spark configuration options used to request it (spark.executor.memory and
>>> spark.yarn.executor.memoryOverhead), but it can't necessarily for the
>>> amount that YARN has rounded up if those configuration properties
>>> (yarn.scheduler.minimum-allocation-mb and
>>> yarn.scheduler.increment-allocation-mb) are not present on the node.
>>>
>>> -Sandy
>>>
>>> -Sandy
>>>
>>> On Mon, Mar 23, 2015 at 5:08 PM, Zoltán Zvara 
>>> wrote:
>>>
 Let's say I'm an Executor instance in a Spark system. Who started me and
 where, when I run on a worker node supervised by (a) Mesos, (b) YARN? I
 suppose I'm the only one Executor on a worker node for a given framework
 scheduler (driver). If I'm an Executor instance, who is the closest
 object
 to me who can tell me how many resources do I have on (a) Mesos, (b)
 YARN?

 Thank you for your kind input!

 Zvara Zoltán



 mail, hangout, skype: zoltan.zv...@gmail.com

 mobile, viber: +36203129543

 bank: 10918001-0021-50480008

 address: Hungary, 2475 Kápolnásnyék, Kossuth 6/a

 elte: HSKSJZ (ZVZOAAI.ELTE)

>>>
>>>
>>
>


Understanding shuffle file name conflicts

2015-03-24 Thread Kannan Rajah
I am working on SPARK-1529. I ran into an issue with my change, where the
same shuffle file was being reused across 2 jobs. Please note this only
happens when I use a hard coded location to use for shuffle files, say
"/tmp". It does not happen with normal code path that uses DiskBlockManager
to pick different directories for each run. So I want to understand how
DiskBlockManager guarantees that such a conflict will never happen.

Let's say the shuffle block id has a value of shuffle_0_0_0. So the data
file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index.
If I run a spark job twice, one after another, these files get created
under different directories because of the hashing logic in
DiskBlockManager. But the hash is based off the file name, so how are we
sure that there won't be a conflict ever?

--
Kannan


Re: Any guidance on when to back port and how far?

2015-03-24 Thread Michael Armbrust
Two other criteria that I use when deciding what to backport:
 - Is it a regression from a previous minor release?  I'm much more likely
to backport fixes in this case, as I'd love for most people to stay up to
date.
 - How scary is the change?  I think the primary goal is stability of the
maintenance branches.  When I am confident that something is isolated and
unlikely to break things (i.e. I'm fixing a confusing error message), then
i'm much more likely to backport it.

Regarding the length of time to continue backporting, I mostly don't
backport to N-1, but this is partially because SQL is changing too fast for
that to generally be useful.  These old branches usually only get attention
from me when there is an explicit request.

I'd love to hear more feedback from others.

Michael

On Tue, Mar 24, 2015 at 6:13 AM, Sean Owen  wrote:

> So far, my rule of thumb has been:
>
> - Don't back-port new features or improvements in general, only bug fixes
> - Don't back-port minor bug fixes
> - Back-port bug fixes that seem important enough to not wait for the
> next minor release
> - Back-port site doc changes to the release most likely to go out
> next, to make it a part of the next site publish
>
> But, how far should back-ports go, in general? If the last minor
> release was 1.N, then to branch 1.N surely. Farther back is a question
> of expectation for support of past minor releases. Given the pace of
> change and time available, I assume there's not much support for
> continuing to use release 1.(N-1) and very little for 1.(N-2).
>
> Concretely: does anyone expect a 1.1.2 release ever? a 1.2.2 release?
> It'd be good to hear the received wisdom explicitly.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: hadoop input/output format advanced control

2015-03-24 Thread Nick Pentreath
Imran, on your point to read multiple files together in a partition, is it
not simpler to use the approach of copy Hadoop conf and set per-RDD
settings for min split to control the input size per partition, together
with something like CombineFileInputFormat?

On Tue, Mar 24, 2015 at 5:28 PM, Imran Rashid  wrote:

> I think this would be a great addition, I totally agree that you need to be
> able to set these at a finer context than just the SparkContext.
>
> Just to play devil's advocate, though -- the alternative is for you just
> subclass HadoopRDD yourself, or make a totally new RDD, and then you could
> expose whatever you need.  Why is this solution better?  IMO the criteria
> are:
> (a) common operations
> (b) error-prone / difficult to implement
> (c) non-obvious, but important for performance
>
> I think this case fits (a) & (c), so I think its still worthwhile.  But its
> also worth asking whether or not its too difficult for a user to extend
> HadoopRDD right now.  There have been several cases in the past week where
> we've suggested that a user should read from hdfs themselves (eg., to read
> multiple files together in one partition) -- with*out* reusing the code in
> HadoopRDD, though they would lose things like the metric tracking &
> preferred locations you get from HadoopRDD.  Does HadoopRDD need to some
> refactoring to make that easier to do?  Or do we just need a good example?
>
> Imran
>
> (sorry for hijacking your thread, Koert)
>
>
>
> On Mon, Mar 23, 2015 at 3:52 PM, Koert Kuipers  wrote:
>
> > see email below. reynold suggested i send it to dev instead of user
> >
> > -- Forwarded message --
> > From: Koert Kuipers 
> > Date: Mon, Mar 23, 2015 at 4:36 PM
> > Subject: hadoop input/output format advanced control
> > To: "u...@spark.apache.org" 
> >
> >
> > currently its pretty hard to control the Hadoop Input/Output formats used
> > in Spark. The conventions seems to be to add extra parameters to all
> > methods and then somewhere deep inside the code (for example in
> > PairRDDFunctions.saveAsHadoopFile) all these parameters get translated
> into
> > settings on the Hadoop Configuration object.
> >
> > for example for compression i see "codec: Option[Class[_ <:
> > CompressionCodec]] = None" added to a bunch of methods.
> >
> > how scalable is this solution really?
> >
> > for example i need to read from a hadoop dataset and i dont want the
> input
> > (part) files to get split up. the way to do this is to set
> > "mapred.min.split.size". now i dont want to set this at the level of the
> > SparkContext (which can be done), since i dont want it to apply to input
> > formats in general. i want it to apply to just this one specific input
> > dataset i need to read. which leaves me with no options currently. i
> could
> > go add yet another input parameter to all the methods
> > (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
> > etc.). but that seems ineffective.
> >
> > why can we not expose a Map[String, String] or some other generic way to
> > manipulate settings for hadoop input/output formats? it would require
> > adding one more parameter to all methods to deal with hadoop input/output
> > formats, but after that its done. one parameter to rule them all
> >
> > then i could do:
> > val x = sc.textFile("/some/path", formatSettings =
> > Map("mapred.min.split.size" -> "12345"))
> >
> > or
> > rdd.saveAsTextFile("/some/path, formatSettings =
> > Map(mapred.output.compress" -> "true", "mapred.output.compression.codec"
> ->
> > "somecodec"))
> >
>


Re: Any guidance on when to back port and how far?

2015-03-24 Thread Patrick Wendell
My philosophy has been basically what you suggested, Sean. One thing
you didn't mention though is if a bug fix seems complicated, I will
think very hard before back-porting it. This is because "fixes" can
introduce their own new bugs, in some cases worse than the original
issue. It's really bad to have some upgrade to a patch release and see
a regression - with our current approach this almost never happens.

I will usually try to backport up to N-2, if it can be back-ported
reasonably easily (for instance, with minor or no code changes). The
reason I do this is that vendors do end up supporting older versions,
and it's nice for them if some committer has backported a fix that
they can then pull in, even if we never ship it.

In terms of doing older maintenance releases, this one I think we
should do according to severity of issues (for instance, if there is a
security issue) or based on general command from the community. I
haven't initiated many 1.X.2 releases recently because I didn't see
huge demand. However, personally I don't mind doing these if there is
a lot of demand, at least for releases where ".0" has gone out in the
last six months.

On Tue, Mar 24, 2015 at 11:23 AM, Michael Armbrust
 wrote:
> Two other criteria that I use when deciding what to backport:
>  - Is it a regression from a previous minor release?  I'm much more likely
> to backport fixes in this case, as I'd love for most people to stay up to
> date.
>  - How scary is the change?  I think the primary goal is stability of the
> maintenance branches.  When I am confident that something is isolated and
> unlikely to break things (i.e. I'm fixing a confusing error message), then
> i'm much more likely to backport it.
>
> Regarding the length of time to continue backporting, I mostly don't
> backport to N-1, but this is partially because SQL is changing too fast for
> that to generally be useful.  These old branches usually only get attention
> from me when there is an explicit request.
>
> I'd love to hear more feedback from others.
>
> Michael
>
> On Tue, Mar 24, 2015 at 6:13 AM, Sean Owen  wrote:
>
>> So far, my rule of thumb has been:
>>
>> - Don't back-port new features or improvements in general, only bug fixes
>> - Don't back-port minor bug fixes
>> - Back-port bug fixes that seem important enough to not wait for the
>> next minor release
>> - Back-port site doc changes to the release most likely to go out
>> next, to make it a part of the next site publish
>>
>> But, how far should back-ports go, in general? If the last minor
>> release was 1.N, then to branch 1.N surely. Farther back is a question
>> of expectation for support of past minor releases. Given the pace of
>> change and time available, I assume there's not much support for
>> continuing to use release 1.(N-1) and very little for 1.(N-2).
>>
>> Concretely: does anyone expect a 1.1.2 release ever? a 1.2.2 release?
>> It'd be good to hear the received wisdom explicitly.
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: hadoop input/output format advanced control

2015-03-24 Thread Patrick Wendell
Yeah - to Nick's point, I think the way to do this is to pass in a
custom conf when you create a Hadoop RDD (that's AFAIK why the conf
field is there). Is there anything you can't do with that feature?

On Tue, Mar 24, 2015 at 11:50 AM, Nick Pentreath
 wrote:
> Imran, on your point to read multiple files together in a partition, is it
> not simpler to use the approach of copy Hadoop conf and set per-RDD
> settings for min split to control the input size per partition, together
> with something like CombineFileInputFormat?
>
> On Tue, Mar 24, 2015 at 5:28 PM, Imran Rashid  wrote:
>
>> I think this would be a great addition, I totally agree that you need to be
>> able to set these at a finer context than just the SparkContext.
>>
>> Just to play devil's advocate, though -- the alternative is for you just
>> subclass HadoopRDD yourself, or make a totally new RDD, and then you could
>> expose whatever you need.  Why is this solution better?  IMO the criteria
>> are:
>> (a) common operations
>> (b) error-prone / difficult to implement
>> (c) non-obvious, but important for performance
>>
>> I think this case fits (a) & (c), so I think its still worthwhile.  But its
>> also worth asking whether or not its too difficult for a user to extend
>> HadoopRDD right now.  There have been several cases in the past week where
>> we've suggested that a user should read from hdfs themselves (eg., to read
>> multiple files together in one partition) -- with*out* reusing the code in
>> HadoopRDD, though they would lose things like the metric tracking &
>> preferred locations you get from HadoopRDD.  Does HadoopRDD need to some
>> refactoring to make that easier to do?  Or do we just need a good example?
>>
>> Imran
>>
>> (sorry for hijacking your thread, Koert)
>>
>>
>>
>> On Mon, Mar 23, 2015 at 3:52 PM, Koert Kuipers  wrote:
>>
>> > see email below. reynold suggested i send it to dev instead of user
>> >
>> > -- Forwarded message --
>> > From: Koert Kuipers 
>> > Date: Mon, Mar 23, 2015 at 4:36 PM
>> > Subject: hadoop input/output format advanced control
>> > To: "u...@spark.apache.org" 
>> >
>> >
>> > currently its pretty hard to control the Hadoop Input/Output formats used
>> > in Spark. The conventions seems to be to add extra parameters to all
>> > methods and then somewhere deep inside the code (for example in
>> > PairRDDFunctions.saveAsHadoopFile) all these parameters get translated
>> into
>> > settings on the Hadoop Configuration object.
>> >
>> > for example for compression i see "codec: Option[Class[_ <:
>> > CompressionCodec]] = None" added to a bunch of methods.
>> >
>> > how scalable is this solution really?
>> >
>> > for example i need to read from a hadoop dataset and i dont want the
>> input
>> > (part) files to get split up. the way to do this is to set
>> > "mapred.min.split.size". now i dont want to set this at the level of the
>> > SparkContext (which can be done), since i dont want it to apply to input
>> > formats in general. i want it to apply to just this one specific input
>> > dataset i need to read. which leaves me with no options currently. i
>> could
>> > go add yet another input parameter to all the methods
>> > (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
>> > etc.). but that seems ineffective.
>> >
>> > why can we not expose a Map[String, String] or some other generic way to
>> > manipulate settings for hadoop input/output formats? it would require
>> > adding one more parameter to all methods to deal with hadoop input/output
>> > formats, but after that its done. one parameter to rule them all
>> >
>> > then i could do:
>> > val x = sc.textFile("/some/path", formatSettings =
>> > Map("mapred.min.split.size" -> "12345"))
>> >
>> > or
>> > rdd.saveAsTextFile("/some/path, formatSettings =
>> > Map(mapred.output.compress" -> "true", "mapred.output.compression.codec"
>> ->
>> > "somecodec"))
>> >
>>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Experience using binary packages on various Hadoop distros

2015-03-24 Thread Patrick Wendell
Hey All,

For a while we've published binary packages with different Hadoop
client's pre-bundled. We currently have three interfaces to a Hadoop
cluster (a) the HDFS client (b) the YARN client (c) the Hive client.

Because (a) and (b) are supposed to be backwards compatible
interfaces. My working assumption was that for the most part (modulo
Hive) our packages work with *newer* Hadoop versions. For instance,
our Hadoop 2.4 package should work with HDFS 2.6 and YARN 2.6.
However, I have heard murmurings that these are not compatible in
practice.

So I have three questions I'd like to put out to the community:

1. Have people had difficulty using 2.4 packages with newer Hadoop
versions? If so, what specific incompatibilities have you hit?
2. Have people had issues using our binary Hadoop packages in general
with commercial or Apache Hadoop distro's, such that you have to build
from source?
3. How would people feel about publishing a "bring your own Hadoop"
binary, where you are required to point us to a local Hadoop
distribution by setting HADOOP_HOME? This might be better for ensuring
full compatibility:
https://issues.apache.org/jira/browse/SPARK-6511

- Patrick

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Spark-thriftserver Issue

2015-03-24 Thread Anubhav Agarwal
Zhan specifying port fixed the port issue.

Is it possible to specify the log directory while starting the spark
thriftserver?
Still getting this error even through the folder exists and everyone has
permission to use that directory.
drwxr-xr-x  2 root root  4096 Mar 24 19:04 spark-events


Exception in thread "main" java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.
at
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99)
at org.apache.spark.SparkContext.(SparkContext.scala:399)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:49)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:58)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



On Mon, Mar 23, 2015 at 6:51 PM, Zhan Zhang  wrote:

>  Probably the port is already used by others, e.g., hive. You can change
> the port similar to below
>
>   ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m 
> --hiveconf hive.server2.thrift.port=10001
>
>
>  Thanks.
>
>  Zhan Zhang
>
>   On Mar 23, 2015, at 12:01 PM, Neil Dev  wrote:
>
> Hi,
>
> I am having issue starting spark-thriftserver. I'm running spark 1.3.with
> Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
> hive-thriftserver as well as spark-thriftserver running at the same time.
>
> Starting sparkthrift server:-
> sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
> --executor-memory 2G
>
> Error:-
> I created the folder manually but still getting the following error
> Exception in thread "main" java.lang.IllegalArgumentException: Log
> directory /tmp/spark-events does not exist.
>
>
> I am getting the following error
> 15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
> org.apache.thrift.transport.TTransportException: Could not create
> ServerSocket on address0.0.0.0/0.0.0.0:1.
>at
> org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93)
>at
> org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79)
>at
>
> org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
>at
>
> org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
>at java.lang.Thread.run(Thread.java:745)
>
> Thanks
> Neil
>
>
>


Re: Spark-thriftserver Issue

2015-03-24 Thread Zhan Zhang
You can try to set it in spark-env.sh.

# - SPARK_LOG_DIR   Where log files are stored.  (Default: 
${SPARK_HOME}/logs)
# - SPARK_PID_DIR   Where the pid file is stored. (Default: /tmp)

Thanks.

Zhan Zhang

On Mar 24, 2015, at 12:10 PM, Anubhav Agarwal 
mailto:anubha...@gmail.com>> wrote:

Zhan specifying port fixed the port issue.

Is it possible to specify the log directory while starting the spark 
thriftserver?
Still getting this error even through the folder exists and everyone has 
permission to use that directory.
drwxr-xr-x  2 root root  4096 Mar 24 19:04 spark-events


Exception in thread "main" java.lang.IllegalArgumentException: Log directory 
/tmp/spark-events does not exist.
at 
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99)
at org.apache.spark.SparkContext.(SparkContext.scala:399)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:49)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:58)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



On Mon, Mar 23, 2015 at 6:51 PM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:
Probably the port is already used by others, e.g., hive. You can change the 
port similar to below


 ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf 
hive.server2.thrift.port=10001

Thanks.

Zhan Zhang

On Mar 23, 2015, at 12:01 PM, Neil Dev 
mailto:neilk...@gmail.com>> wrote:

Hi,

I am having issue starting spark-thriftserver. I'm running spark 1.3.with
Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
hive-thriftserver as well as spark-thriftserver running at the same time.

Starting sparkthrift server:-
sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
--executor-memory 2G

Error:-
I created the folder manually but still getting the following error
Exception in thread "main" java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.


I am getting the following error
15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address0.0.0.0/0.0.0.0:1.
   at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:93)
   at
org.apache.thrift.transport.TServerSocket.(TServerSocket.java:79)
   at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
   at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
   at java.lang.Thread.run(Thread.java:745)

Thanks
Neil





Re: Experience using binary packages on various Hadoop distros

2015-03-24 Thread Matei Zaharia
Just a note, one challenge with the BYOH version might be that users who 
download that can't run in local mode without also having Hadoop. But if we 
describe it correctly then hopefully it's okay.

Matei

> On Mar 24, 2015, at 3:05 PM, Patrick Wendell  wrote:
> 
> Hey All,
> 
> For a while we've published binary packages with different Hadoop
> client's pre-bundled. We currently have three interfaces to a Hadoop
> cluster (a) the HDFS client (b) the YARN client (c) the Hive client.
> 
> Because (a) and (b) are supposed to be backwards compatible
> interfaces. My working assumption was that for the most part (modulo
> Hive) our packages work with *newer* Hadoop versions. For instance,
> our Hadoop 2.4 package should work with HDFS 2.6 and YARN 2.6.
> However, I have heard murmurings that these are not compatible in
> practice.
> 
> So I have three questions I'd like to put out to the community:
> 
> 1. Have people had difficulty using 2.4 packages with newer Hadoop
> versions? If so, what specific incompatibilities have you hit?
> 2. Have people had issues using our binary Hadoop packages in general
> with commercial or Apache Hadoop distro's, such that you have to build
> from source?
> 3. How would people feel about publishing a "bring your own Hadoop"
> binary, where you are required to point us to a local Hadoop
> distribution by setting HADOOP_HOME? This might be better for ensuring
> full compatibility:
> https://issues.apache.org/jira/browse/SPARK-6511
> 
> - Patrick
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: hadoop input/output format advanced control

2015-03-24 Thread Koert Kuipers
i would like to use objectFile with some tweaks to the hadoop conf.
currently there is no way to do that, except recreating objectFile myself.
and some of the code objectFile uses i have no access to, since its private
to spark.


On Tue, Mar 24, 2015 at 2:59 PM, Patrick Wendell  wrote:

> Yeah - to Nick's point, I think the way to do this is to pass in a
> custom conf when you create a Hadoop RDD (that's AFAIK why the conf
> field is there). Is there anything you can't do with that feature?
>
> On Tue, Mar 24, 2015 at 11:50 AM, Nick Pentreath
>  wrote:
> > Imran, on your point to read multiple files together in a partition, is
> it
> > not simpler to use the approach of copy Hadoop conf and set per-RDD
> > settings for min split to control the input size per partition, together
> > with something like CombineFileInputFormat?
> >
> > On Tue, Mar 24, 2015 at 5:28 PM, Imran Rashid 
> wrote:
> >
> >> I think this would be a great addition, I totally agree that you need
> to be
> >> able to set these at a finer context than just the SparkContext.
> >>
> >> Just to play devil's advocate, though -- the alternative is for you just
> >> subclass HadoopRDD yourself, or make a totally new RDD, and then you
> could
> >> expose whatever you need.  Why is this solution better?  IMO the
> criteria
> >> are:
> >> (a) common operations
> >> (b) error-prone / difficult to implement
> >> (c) non-obvious, but important for performance
> >>
> >> I think this case fits (a) & (c), so I think its still worthwhile.  But
> its
> >> also worth asking whether or not its too difficult for a user to extend
> >> HadoopRDD right now.  There have been several cases in the past week
> where
> >> we've suggested that a user should read from hdfs themselves (eg., to
> read
> >> multiple files together in one partition) -- with*out* reusing the code
> in
> >> HadoopRDD, though they would lose things like the metric tracking &
> >> preferred locations you get from HadoopRDD.  Does HadoopRDD need to some
> >> refactoring to make that easier to do?  Or do we just need a good
> example?
> >>
> >> Imran
> >>
> >> (sorry for hijacking your thread, Koert)
> >>
> >>
> >>
> >> On Mon, Mar 23, 2015 at 3:52 PM, Koert Kuipers 
> wrote:
> >>
> >> > see email below. reynold suggested i send it to dev instead of user
> >> >
> >> > -- Forwarded message --
> >> > From: Koert Kuipers 
> >> > Date: Mon, Mar 23, 2015 at 4:36 PM
> >> > Subject: hadoop input/output format advanced control
> >> > To: "u...@spark.apache.org" 
> >> >
> >> >
> >> > currently its pretty hard to control the Hadoop Input/Output formats
> used
> >> > in Spark. The conventions seems to be to add extra parameters to all
> >> > methods and then somewhere deep inside the code (for example in
> >> > PairRDDFunctions.saveAsHadoopFile) all these parameters get translated
> >> into
> >> > settings on the Hadoop Configuration object.
> >> >
> >> > for example for compression i see "codec: Option[Class[_ <:
> >> > CompressionCodec]] = None" added to a bunch of methods.
> >> >
> >> > how scalable is this solution really?
> >> >
> >> > for example i need to read from a hadoop dataset and i dont want the
> >> input
> >> > (part) files to get split up. the way to do this is to set
> >> > "mapred.min.split.size". now i dont want to set this at the level of
> the
> >> > SparkContext (which can be done), since i dont want it to apply to
> input
> >> > formats in general. i want it to apply to just this one specific input
> >> > dataset i need to read. which leaves me with no options currently. i
> >> could
> >> > go add yet another input parameter to all the methods
> >> > (SparkContext.textFile, SparkContext.hadoopFile,
> SparkContext.objectFile,
> >> > etc.). but that seems ineffective.
> >> >
> >> > why can we not expose a Map[String, String] or some other generic way
> to
> >> > manipulate settings for hadoop input/output formats? it would require
> >> > adding one more parameter to all methods to deal with hadoop
> input/output
> >> > formats, but after that its done. one parameter to rule them all
> >> >
> >> > then i could do:
> >> > val x = sc.textFile("/some/path", formatSettings =
> >> > Map("mapred.min.split.size" -> "12345"))
> >> >
> >> > or
> >> > rdd.saveAsTextFile("/some/path, formatSettings =
> >> > Map(mapred.output.compress" -> "true",
> "mapred.output.compression.codec"
> >> ->
> >> > "somecodec"))
> >> >
> >>
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: hadoop input/output format advanced control

2015-03-24 Thread Koert Kuipers
the (compression) codec parameter that is now part of many saveAs...
methods came from a similar need. see SPARK-763

hadoop has many options like this. you either going to have to allow many
more of these optional arguments to all the methods that read from hadoop
inputformats and write to hadoop outputformats, or you force people to
re-create these methods using HadoopRDD, i think (if thats even possible).

On Tue, Mar 24, 2015 at 6:40 PM, Koert Kuipers  wrote:

> i would like to use objectFile with some tweaks to the hadoop conf.
> currently there is no way to do that, except recreating objectFile myself.
> and some of the code objectFile uses i have no access to, since its private
> to spark.
>
>
> On Tue, Mar 24, 2015 at 2:59 PM, Patrick Wendell 
> wrote:
>
>> Yeah - to Nick's point, I think the way to do this is to pass in a
>> custom conf when you create a Hadoop RDD (that's AFAIK why the conf
>> field is there). Is there anything you can't do with that feature?
>>
>> On Tue, Mar 24, 2015 at 11:50 AM, Nick Pentreath
>>  wrote:
>> > Imran, on your point to read multiple files together in a partition, is
>> it
>> > not simpler to use the approach of copy Hadoop conf and set per-RDD
>> > settings for min split to control the input size per partition, together
>> > with something like CombineFileInputFormat?
>> >
>> > On Tue, Mar 24, 2015 at 5:28 PM, Imran Rashid 
>> wrote:
>> >
>> >> I think this would be a great addition, I totally agree that you need
>> to be
>> >> able to set these at a finer context than just the SparkContext.
>> >>
>> >> Just to play devil's advocate, though -- the alternative is for you
>> just
>> >> subclass HadoopRDD yourself, or make a totally new RDD, and then you
>> could
>> >> expose whatever you need.  Why is this solution better?  IMO the
>> criteria
>> >> are:
>> >> (a) common operations
>> >> (b) error-prone / difficult to implement
>> >> (c) non-obvious, but important for performance
>> >>
>> >> I think this case fits (a) & (c), so I think its still worthwhile.
>> But its
>> >> also worth asking whether or not its too difficult for a user to extend
>> >> HadoopRDD right now.  There have been several cases in the past week
>> where
>> >> we've suggested that a user should read from hdfs themselves (eg., to
>> read
>> >> multiple files together in one partition) -- with*out* reusing the
>> code in
>> >> HadoopRDD, though they would lose things like the metric tracking &
>> >> preferred locations you get from HadoopRDD.  Does HadoopRDD need to
>> some
>> >> refactoring to make that easier to do?  Or do we just need a good
>> example?
>> >>
>> >> Imran
>> >>
>> >> (sorry for hijacking your thread, Koert)
>> >>
>> >>
>> >>
>> >> On Mon, Mar 23, 2015 at 3:52 PM, Koert Kuipers 
>> wrote:
>> >>
>> >> > see email below. reynold suggested i send it to dev instead of user
>> >> >
>> >> > -- Forwarded message --
>> >> > From: Koert Kuipers 
>> >> > Date: Mon, Mar 23, 2015 at 4:36 PM
>> >> > Subject: hadoop input/output format advanced control
>> >> > To: "u...@spark.apache.org" 
>> >> >
>> >> >
>> >> > currently its pretty hard to control the Hadoop Input/Output formats
>> used
>> >> > in Spark. The conventions seems to be to add extra parameters to all
>> >> > methods and then somewhere deep inside the code (for example in
>> >> > PairRDDFunctions.saveAsHadoopFile) all these parameters get
>> translated
>> >> into
>> >> > settings on the Hadoop Configuration object.
>> >> >
>> >> > for example for compression i see "codec: Option[Class[_ <:
>> >> > CompressionCodec]] = None" added to a bunch of methods.
>> >> >
>> >> > how scalable is this solution really?
>> >> >
>> >> > for example i need to read from a hadoop dataset and i dont want the
>> >> input
>> >> > (part) files to get split up. the way to do this is to set
>> >> > "mapred.min.split.size". now i dont want to set this at the level of
>> the
>> >> > SparkContext (which can be done), since i dont want it to apply to
>> input
>> >> > formats in general. i want it to apply to just this one specific
>> input
>> >> > dataset i need to read. which leaves me with no options currently. i
>> >> could
>> >> > go add yet another input parameter to all the methods
>> >> > (SparkContext.textFile, SparkContext.hadoopFile,
>> SparkContext.objectFile,
>> >> > etc.). but that seems ineffective.
>> >> >
>> >> > why can we not expose a Map[String, String] or some other generic
>> way to
>> >> > manipulate settings for hadoop input/output formats? it would require
>> >> > adding one more parameter to all methods to deal with hadoop
>> input/output
>> >> > formats, but after that its done. one parameter to rule them all
>> >> >
>> >> > then i could do:
>> >> > val x = sc.textFile("/some/path", formatSettings =
>> >> > Map("mapred.min.split.size" -> "12345"))
>> >> >
>> >> > or
>> >> > rdd.saveAsTextFile("/some/path, formatSettings =
>> >> > Map(mapred.output.compress" -> "tr

Re: Experience using binary packages on various Hadoop distros

2015-03-24 Thread Jey Kottalam
Could we gracefully fallback to an in-tree Hadoop binary (e.g. 1.0.4)
in that case? I think many new Spark users are confused about why
Spark has anything to do with Hadoop, e.g. I could see myself being
confused when the download page asks me to select a "package type". I
know that what I want is not "source code", but I'd have no idea how
to choose amongst the apparently multiple types of binaries.

On Tue, Mar 24, 2015 at 2:28 PM, Matei Zaharia  wrote:
> Just a note, one challenge with the BYOH version might be that users who 
> download that can't run in local mode without also having Hadoop. But if we 
> describe it correctly then hopefully it's okay.
>
> Matei
>
>> On Mar 24, 2015, at 3:05 PM, Patrick Wendell  wrote:
>>
>> Hey All,
>>
>> For a while we've published binary packages with different Hadoop
>> client's pre-bundled. We currently have three interfaces to a Hadoop
>> cluster (a) the HDFS client (b) the YARN client (c) the Hive client.
>>
>> Because (a) and (b) are supposed to be backwards compatible
>> interfaces. My working assumption was that for the most part (modulo
>> Hive) our packages work with *newer* Hadoop versions. For instance,
>> our Hadoop 2.4 package should work with HDFS 2.6 and YARN 2.6.
>> However, I have heard murmurings that these are not compatible in
>> practice.
>>
>> So I have three questions I'd like to put out to the community:
>>
>> 1. Have people had difficulty using 2.4 packages with newer Hadoop
>> versions? If so, what specific incompatibilities have you hit?
>> 2. Have people had issues using our binary Hadoop packages in general
>> with commercial or Apache Hadoop distro's, such that you have to build
>> from source?
>> 3. How would people feel about publishing a "bring your own Hadoop"
>> binary, where you are required to point us to a local Hadoop
>> distribution by setting HADOOP_HOME? This might be better for ensuring
>> full compatibility:
>> https://issues.apache.org/jira/browse/SPARK-6511
>>
>> - Patrick
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Experience using binary packages on various Hadoop distros

2015-03-24 Thread Patrick Wendell
We can probably better explain that if you are not using HDFS or YARN,
you can download any binary.

However, my question was about if the existing binaries do not work
well with newer Hadoop versions, which I heard some people suggest but
I'm looking for more specific issues.

On Tue, Mar 24, 2015 at 4:16 PM, Jey Kottalam  wrote:
> Could we gracefully fallback to an in-tree Hadoop binary (e.g. 1.0.4)
> in that case? I think many new Spark users are confused about why
> Spark has anything to do with Hadoop, e.g. I could see myself being
> confused when the download page asks me to select a "package type". I
> know that what I want is not "source code", but I'd have no idea how
> to choose amongst the apparently multiple types of binaries.
>
> On Tue, Mar 24, 2015 at 2:28 PM, Matei Zaharia  
> wrote:
>> Just a note, one challenge with the BYOH version might be that users who 
>> download that can't run in local mode without also having Hadoop. But if we 
>> describe it correctly then hopefully it's okay.
>>
>> Matei
>>
>>> On Mar 24, 2015, at 3:05 PM, Patrick Wendell  wrote:
>>>
>>> Hey All,
>>>
>>> For a while we've published binary packages with different Hadoop
>>> client's pre-bundled. We currently have three interfaces to a Hadoop
>>> cluster (a) the HDFS client (b) the YARN client (c) the Hive client.
>>>
>>> Because (a) and (b) are supposed to be backwards compatible
>>> interfaces. My working assumption was that for the most part (modulo
>>> Hive) our packages work with *newer* Hadoop versions. For instance,
>>> our Hadoop 2.4 package should work with HDFS 2.6 and YARN 2.6.
>>> However, I have heard murmurings that these are not compatible in
>>> practice.
>>>
>>> So I have three questions I'd like to put out to the community:
>>>
>>> 1. Have people had difficulty using 2.4 packages with newer Hadoop
>>> versions? If so, what specific incompatibilities have you hit?
>>> 2. Have people had issues using our binary Hadoop packages in general
>>> with commercial or Apache Hadoop distro's, such that you have to build
>>> from source?
>>> 3. How would people feel about publishing a "bring your own Hadoop"
>>> binary, where you are required to point us to a local Hadoop
>>> distribution by setting HADOOP_HOME? This might be better for ensuring
>>> full compatibility:
>>> https://issues.apache.org/jira/browse/SPARK-6511
>>>
>>> - Patrick
>>>
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



RE: Using CUDA within Spark / boosting linear algebra

2015-03-24 Thread Ulanov, Alexander
Hi,

I am trying to use nvblas with netlib-java from Spark. nvblas functions should 
replace current blas functions calls after executing LD_PRELOAD as suggested in 
http://docs.nvidia.com/cuda/nvblas/#Usage without any changes to netlib-java. 
It seems to work for simple Java example, but I cannot make it work with Spark. 
I run the following:
export LD_LIBRARY_PATH=/usr/local/cuda-6.5/lib64
env LD_PRELOAD=/usr/local/cuda-6.5/lib64/libnvblas.so ./spark-shell 
--driver-memory 4G
In nvidia-smi I observe that Java is to use GPU:
+-+
| Processes:   GPU Memory |
|  GPU   PID  Type  Process name   Usage  |
|=|
|0  8873C   bash39MiB |
|0  8910C   /usr/lib/jvm/java-1.7.0/bin/java39MiB |
+-+

In Spark shell I do matrix multiplication and see the following:
15/03/25 06:48:01 INFO JniLoader: successfully loaded 
/tmp/jniloader8192964377009965483netlib-native_system-linux-x86_64.so
So I am sure that netlib-native is loaded and cblas supposedly used. However, 
matrix multiplication does executes on CPU since I see 16% of CPU used and 0% 
of GPU used. I also checked different matrix sizes, from 100x100 to 12000x12000

Could you suggest might the LD_PRELOAD not affect Spark shell?

Best regards, Alexander



From: Sam Halliday [mailto:sam.halli...@gmail.com]
Sent: Monday, March 09, 2015 6:01 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org; Xiangrui Meng; Joseph Bradley; Evan R. Sparks
Subject: RE: Using CUDA within Spark / boosting linear algebra


Thanks so much for following up on this!

Hmm, I wonder if we should have a concerted effort to chart performance on 
various pieces of hardware...
On 9 Mar 2015 21:08, "Ulanov, Alexander" 
mailto:alexander.ula...@hp.com>> wrote:
Hi Everyone, I've updated the benchmark as Xiangrui suggested. Added the 
comment that BIDMat 0.9.7 uses Float matrices in GPU (although I see the 
support of Double in the current source code), did the test with BIDMat and CPU 
Double matrices. BIDMat MKL is indeed on par with netlib MKL.

https://docs.google.com/spreadsheets/d/1lWdVSuSragOobb0A_oeouQgHUMx378T9J5r7kwKSPkY/edit?usp=sharing

Best regards, Alexander

-Original Message-
From: Sam Halliday 
[mailto:sam.halli...@gmail.com]
Sent: Tuesday, March 03, 2015 1:54 PM
To: Xiangrui Meng; Joseph Bradley
Cc: Evan R. Sparks; Ulanov, Alexander; 
dev@spark.apache.org
Subject: Re: Using CUDA within Spark / boosting linear algebra

BTW, is anybody on this list going to the London Meetup in a few weeks?

https://skillsmatter.com/meetups/6987-apache-spark-living-the-post-mapreduce-world#community

Would be nice to meet other people working on the guts of Spark! :-)


Xiangrui Meng mailto:men...@gmail.com>> writes:

> Hey Alexander,
>
> I don't quite understand the part where netlib-cublas is about 20x
> slower than netlib-openblas. What is the overhead of using a GPU BLAS
> with netlib-java?
>
> CC'ed Sam, the author of netlib-java.
>
> Best,
> Xiangrui
>
> On Wed, Feb 25, 2015 at 3:36 PM, Joseph Bradley 
> mailto:jos...@databricks.com>> wrote:
>> Better documentation for linking would be very helpful!  Here's a JIRA:
>> https://issues.apache.org/jira/browse/SPARK-6019
>>
>>
>> On Wed, Feb 25, 2015 at 2:53 PM, Evan R. Sparks
>> mailto:evan.spa...@gmail.com>>
>> wrote:
>>
>>> Thanks for compiling all the data and running these benchmarks,
>>> Alex. The big takeaways here can be seen with this chart:
>>>
>>> https://docs.google.com/spreadsheets/d/1aRm2IADRfXQV7G2vrcVh4StF50uZ
>>> Hl6kmAJeaZZggr0/pubchart?oid=1899767119&format=interactive
>>>
>>> 1) A properly configured GPU matrix multiply implementation (e.g.
>>> BIDMat+GPU) can provide substantial (but less than an order of
>>> BIDMat+magnitude)
>>> benefit over a well-tuned CPU implementation (e.g. BIDMat+MKL or
>>> netlib-java+openblas-compiled).
>>> 2) A poorly tuned CPU implementation can be 1-2 orders of magnitude
>>> worse than a well-tuned CPU implementation, particularly for larger 
>>> matrices.
>>> (netlib-f2jblas or netlib-ref) This is not to pick on netlib - this
>>> basically agrees with the authors own benchmarks (
>>> https://github.com/fommil/netlib-java)
>>>
>>> I think that most of our users are in a situation where using GPUs
>>> may not be practical - although we could consider having a good GPU
>>> backend available as an option. However, *ALL* users of MLlib could
>>> benefit (potentially tremendously) from using a well-tuned CPU-based
>>> BLAS implementation. Perhaps we should consider updating the mllib
>>> guide with a more complete section for enabling high performance

Re: Understanding shuffle file name conflicts

2015-03-24 Thread Saisai Shao
Hi Kannan,

As I know the shuffle Id in ShuffleDependency will be increased, so even if
you run the same job twice, the shuffle dependency as well as shuffle id is
different, so the shuffle file name which is combined by
(shuffleId+mapId+reduceId) will be changed, so there's no name conflict
even in the same directory as I know.

Thanks
Jerry


2015-03-25 1:56 GMT+08:00 Kannan Rajah :

> I am working on SPARK-1529. I ran into an issue with my change, where the
> same shuffle file was being reused across 2 jobs. Please note this only
> happens when I use a hard coded location to use for shuffle files, say
> "/tmp". It does not happen with normal code path that uses DiskBlockManager
> to pick different directories for each run. So I want to understand how
> DiskBlockManager guarantees that such a conflict will never happen.
>
> Let's say the shuffle block id has a value of shuffle_0_0_0. So the data
> file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index.
> If I run a spark job twice, one after another, these files get created
> under different directories because of the hashing logic in
> DiskBlockManager. But the hash is based off the file name, so how are we
> sure that there won't be a conflict ever?
>
> --
> Kannan
>


Spark SQL(1.3.0) "import sqlContext.implicits._" seems not work for converting a case class RDD to DataFrame

2015-03-24 Thread Zhiwei Chan
Hi all,

  I just upgraded spark from 1.2.1 to 1.3.0, and changed the "import
sqlContext.createSchemaRDD" to "import sqlContext.implicits._" in my code.
(I scan the programming guide and it seems this is the only change I need
to do). But it come to an error when run compile as following:
>>>
[ERROR] ...\magic.scala:527: error: value registerTempTable is not a member
of org.apache.spark.rdd.RDD[com.yhd.ycache.magic.Table]
[INFO] tableRdd.registerTempTable(tableName)
<<<

Then I try the exactly example in the programming guide of 1.3  in
spark-shell, it come to the same error.
>>>
scala> sys.env.get("CLASSPATH")
res7: Option[String] =
Some(:/root/scala/spark-1.3.0-bin-hadoop2.4/conf:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar)

scala>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext =
org.apache.spark.sql.SQLContext@4b05b3ff

scala>  import sqlContext.implicits._
import sqlContext.implicits._

scala>  case class Person(name: String, age: Int)
defined class Person

scala>   val t1 =
sc.textFile("hdfs://heju:8020/user/root/magic/poolInfo.txt")
15/03/25 11:13:35 INFO MemoryStore: ensureFreeSpace(81443) called with
curMem=186397, maxMem=278302556
15/03/25 11:13:35 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 79.5 KB, free 265.2 MB)
15/03/25 11:13:35 INFO MemoryStore: ensureFreeSpace(31262) called with
curMem=267840, maxMem=278302556
15/03/25 11:13:35 INFO MemoryStore: Block broadcast_3_piece0 stored as
bytes in memory (estimated size 30.5 KB, free 265.1 MB)
15/03/25 11:13:35 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on heju:48885 (size: 30.5 KB, free: 265.4 MB)
15/03/25 11:13:35 INFO BlockManagerMaster: Updated info of block
broadcast_3_piece0
15/03/25 11:13:35 INFO SparkContext: Created broadcast 3 from textFile at
:34
t1: org.apache.spark.rdd.RDD[String] =
hdfs://heju:8020/user/root/magic/poolInfo.txt MapPartitionsRDD[9] at
textFile at :34

scala>  val t2 = t1.flatMap(_.split("\n")).map(_.split(" ")).map(p =>
Person(p(0),1))
t2: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[12] at map at
:38

scala>  t2.registerTempTable("people")
:41: error: value registerTempTable is not a member of
org.apache.spark.rdd.RDD[Person]
   t2.registerTempTable("people")
  ^
<<<

I found the following explanation in programming guide about implicit
convert case class to DataFrams, but I don't understand what I should do.
Could any one tell me how should I do if I want to convert a case class RDD
to DataFrame?

>>>
Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)

Many of the code examples prior to Spark 1.3 started with import
sqlContext._, which brought all of the functions from sqlContext into
scope. In Spark 1.3 we have isolated the implicit conversions for
converting RDDs into DataFrames into an object inside of the SQLContext.
Users should now write import sqlContext.implicits._.

Additionally, the implicit conversions now only augment RDDs that are
composed of Products (i.e., case classes or tuples) with a method toDF,
instead of applying automatically.

<<<
Thanks
Jason


Re: Spark SQL(1.3.0) "import sqlContext.implicits._" seems not work for converting a case class RDD to DataFrame

2015-03-24 Thread Ted Yu
Please take a look at:
./sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala
./sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala

Cheers

On Tue, Mar 24, 2015 at 8:46 PM, Zhiwei Chan 
wrote:

> Hi all,
>
>   I just upgraded spark from 1.2.1 to 1.3.0, and changed the "import
> sqlContext.createSchemaRDD" to "import sqlContext.implicits._" in my code.
> (I scan the programming guide and it seems this is the only change I need
> to do). But it come to an error when run compile as following:
> >>>
> [ERROR] ...\magic.scala:527: error: value registerTempTable is not a member
> of org.apache.spark.rdd.RDD[com.yhd.ycache.magic.Table]
> [INFO] tableRdd.registerTempTable(tableName)
> <<<
>
> Then I try the exactly example in the programming guide of 1.3  in
> spark-shell, it come to the same error.
> >>>
> scala> sys.env.get("CLASSPATH")
> res7: Option[String] =
>
> Some(:/root/scala/spark-1.3.0-bin-hadoop2.4/conf:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar)
>
> scala>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> sqlContext: org.apache.spark.sql.SQLContext =
> org.apache.spark.sql.SQLContext@4b05b3ff
>
> scala>  import sqlContext.implicits._
> import sqlContext.implicits._
>
> scala>  case class Person(name: String, age: Int)
> defined class Person
>
> scala>   val t1 =
> sc.textFile("hdfs://heju:8020/user/root/magic/poolInfo.txt")
> 15/03/25 11:13:35 INFO MemoryStore: ensureFreeSpace(81443) called with
> curMem=186397, maxMem=278302556
> 15/03/25 11:13:35 INFO MemoryStore: Block broadcast_3 stored as values in
> memory (estimated size 79.5 KB, free 265.2 MB)
> 15/03/25 11:13:35 INFO MemoryStore: ensureFreeSpace(31262) called with
> curMem=267840, maxMem=278302556
> 15/03/25 11:13:35 INFO MemoryStore: Block broadcast_3_piece0 stored as
> bytes in memory (estimated size 30.5 KB, free 265.1 MB)
> 15/03/25 11:13:35 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on heju:48885 (size: 30.5 KB, free: 265.4 MB)
> 15/03/25 11:13:35 INFO BlockManagerMaster: Updated info of block
> broadcast_3_piece0
> 15/03/25 11:13:35 INFO SparkContext: Created broadcast 3 from textFile at
> :34
> t1: org.apache.spark.rdd.RDD[String] =
> hdfs://heju:8020/user/root/magic/poolInfo.txt MapPartitionsRDD[9] at
> textFile at :34
>
> scala>  val t2 = t1.flatMap(_.split("\n")).map(_.split(" ")).map(p =>
> Person(p(0),1))
> t2: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[12] at map at
> :38
>
> scala>  t2.registerTempTable("people")
> :41: error: value registerTempTable is not a member of
> org.apache.spark.rdd.RDD[Person]
>t2.registerTempTable("people")
>   ^
> <<<
>
> I found the following explanation in programming guide about implicit
> convert case class to DataFrams, but I don't understand what I should do.
> Could any one tell me how should I do if I want to convert a case class RDD
> to DataFrame?
>
> >>>
> Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)
>
> Many of the code examples prior to Spark 1.3 started with import
> sqlContext._, which brought all of the functions from sqlContext into
> scope. In Spark 1.3 we have isolated the implicit conversions for
> converting RDDs into DataFrames into an object inside of the SQLContext.
> Users should now write import sqlContext.implicits._.
>
> Additionally, the implicit conversions now only augment RDDs that are
> composed of Products (i.e., case classes or tuples) with a method toDF,
> instead of applying automatically.
>
> <<<
> Thanks
> Jason
>


Re: Spark SQL(1.3.0) "import sqlContext.implicits._" seems not work for converting a case class RDD to DataFrame

2015-03-24 Thread Reynold Xin
In particular:

http://spark.apache.org/docs/latest/sql-programming-guide.html


"Additionally, the implicit conversions now only augment RDDs that are
composed of Products (i.e., case classes or tuples) with a method toDF,
instead of applying automatically."



On Tue, Mar 24, 2015 at 9:07 PM, Ted Yu  wrote:

> Please take a look at:
> ./sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala
> ./sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
>
> Cheers
>
> On Tue, Mar 24, 2015 at 8:46 PM, Zhiwei Chan 
> wrote:
>
> > Hi all,
> >
> >   I just upgraded spark from 1.2.1 to 1.3.0, and changed the "import
> > sqlContext.createSchemaRDD" to "import sqlContext.implicits._" in my
> code.
> > (I scan the programming guide and it seems this is the only change I need
> > to do). But it come to an error when run compile as following:
> > >>>
> > [ERROR] ...\magic.scala:527: error: value registerTempTable is not a
> member
> > of org.apache.spark.rdd.RDD[com.yhd.ycache.magic.Table]
> > [INFO] tableRdd.registerTempTable(tableName)
> > <<<
> >
> > Then I try the exactly example in the programming guide of 1.3  in
> > spark-shell, it come to the same error.
> > >>>
> > scala> sys.env.get("CLASSPATH")
> > res7: Option[String] =
> >
> >
> Some(:/root/scala/spark-1.3.0-bin-hadoop2.4/conf:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/root/scala/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar)
> >
> > scala>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> > sqlContext: org.apache.spark.sql.SQLContext =
> > org.apache.spark.sql.SQLContext@4b05b3ff
> >
> > scala>  import sqlContext.implicits._
> > import sqlContext.implicits._
> >
> > scala>  case class Person(name: String, age: Int)
> > defined class Person
> >
> > scala>   val t1 =
> > sc.textFile("hdfs://heju:8020/user/root/magic/poolInfo.txt")
> > 15/03/25 11:13:35 INFO MemoryStore: ensureFreeSpace(81443) called with
> > curMem=186397, maxMem=278302556
> > 15/03/25 11:13:35 INFO MemoryStore: Block broadcast_3 stored as values in
> > memory (estimated size 79.5 KB, free 265.2 MB)
> > 15/03/25 11:13:35 INFO MemoryStore: ensureFreeSpace(31262) called with
> > curMem=267840, maxMem=278302556
> > 15/03/25 11:13:35 INFO MemoryStore: Block broadcast_3_piece0 stored as
> > bytes in memory (estimated size 30.5 KB, free 265.1 MB)
> > 15/03/25 11:13:35 INFO BlockManagerInfo: Added broadcast_3_piece0 in
> memory
> > on heju:48885 (size: 30.5 KB, free: 265.4 MB)
> > 15/03/25 11:13:35 INFO BlockManagerMaster: Updated info of block
> > broadcast_3_piece0
> > 15/03/25 11:13:35 INFO SparkContext: Created broadcast 3 from textFile at
> > :34
> > t1: org.apache.spark.rdd.RDD[String] =
> > hdfs://heju:8020/user/root/magic/poolInfo.txt MapPartitionsRDD[9] at
> > textFile at :34
> >
> > scala>  val t2 = t1.flatMap(_.split("\n")).map(_.split(" ")).map(p =>
> > Person(p(0),1))
> > t2: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[12] at map at
> > :38
> >
> > scala>  t2.registerTempTable("people")
> > :41: error: value registerTempTable is not a member of
> > org.apache.spark.rdd.RDD[Person]
> >t2.registerTempTable("people")
> >   ^
> > <<<
> >
> > I found the following explanation in programming guide about implicit
> > convert case class to DataFrams, but I don't understand what I should do.
> > Could any one tell me how should I do if I want to convert a case class
> RDD
> > to DataFrame?
> >
> > >>>
> > Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)
> >
> > Many of the code examples prior to Spark 1.3 started with import
> > sqlContext._, which brought all of the functions from sqlContext into
> > scope. In Spark 1.3 we have isolated the implicit conversions for
> > converting RDDs into DataFrames into an object inside of the SQLContext.
> > Users should now write import sqlContext.implicits._.
> >
> > Additionally, the implicit conversions now only augment RDDs that are
> > composed of Products (i.e., case classes or tuples) with a method toDF,
> > instead of applying automatically.
> >
> > <<<
> > Thanks
> > Jason
> >
>


Re: hadoop input/output format advanced control

2015-03-24 Thread Patrick Wendell
I see - if you look, in the saving functions we have the option for
the user to pass an arbitrary Configuration.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L894

It seems fine to have the same option for the loading functions, if
it's easy to just pass this config into the input format.



On Tue, Mar 24, 2015 at 3:46 PM, Koert Kuipers  wrote:
> the (compression) codec parameter that is now part of many saveAs... methods
> came from a similar need. see SPARK-763
> hadoop has many options like this. you either going to have to allow many
> more of these optional arguments to all the methods that read from hadoop
> inputformats and write to hadoop outputformats, or you force people to
> re-create these methods using HadoopRDD, i think (if thats even possible).
>
> On Tue, Mar 24, 2015 at 6:40 PM, Koert Kuipers  wrote:
>>
>> i would like to use objectFile with some tweaks to the hadoop conf.
>> currently there is no way to do that, except recreating objectFile myself.
>> and some of the code objectFile uses i have no access to, since its private
>> to spark.
>>
>>
>> On Tue, Mar 24, 2015 at 2:59 PM, Patrick Wendell 
>> wrote:
>>>
>>> Yeah - to Nick's point, I think the way to do this is to pass in a
>>> custom conf when you create a Hadoop RDD (that's AFAIK why the conf
>>> field is there). Is there anything you can't do with that feature?
>>>
>>> On Tue, Mar 24, 2015 at 11:50 AM, Nick Pentreath
>>>  wrote:
>>> > Imran, on your point to read multiple files together in a partition, is
>>> > it
>>> > not simpler to use the approach of copy Hadoop conf and set per-RDD
>>> > settings for min split to control the input size per partition,
>>> > together
>>> > with something like CombineFileInputFormat?
>>> >
>>> > On Tue, Mar 24, 2015 at 5:28 PM, Imran Rashid 
>>> > wrote:
>>> >
>>> >> I think this would be a great addition, I totally agree that you need
>>> >> to be
>>> >> able to set these at a finer context than just the SparkContext.
>>> >>
>>> >> Just to play devil's advocate, though -- the alternative is for you
>>> >> just
>>> >> subclass HadoopRDD yourself, or make a totally new RDD, and then you
>>> >> could
>>> >> expose whatever you need.  Why is this solution better?  IMO the
>>> >> criteria
>>> >> are:
>>> >> (a) common operations
>>> >> (b) error-prone / difficult to implement
>>> >> (c) non-obvious, but important for performance
>>> >>
>>> >> I think this case fits (a) & (c), so I think its still worthwhile.
>>> >> But its
>>> >> also worth asking whether or not its too difficult for a user to
>>> >> extend
>>> >> HadoopRDD right now.  There have been several cases in the past week
>>> >> where
>>> >> we've suggested that a user should read from hdfs themselves (eg., to
>>> >> read
>>> >> multiple files together in one partition) -- with*out* reusing the
>>> >> code in
>>> >> HadoopRDD, though they would lose things like the metric tracking &
>>> >> preferred locations you get from HadoopRDD.  Does HadoopRDD need to
>>> >> some
>>> >> refactoring to make that easier to do?  Or do we just need a good
>>> >> example?
>>> >>
>>> >> Imran
>>> >>
>>> >> (sorry for hijacking your thread, Koert)
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Mar 23, 2015 at 3:52 PM, Koert Kuipers 
>>> >> wrote:
>>> >>
>>> >> > see email below. reynold suggested i send it to dev instead of user
>>> >> >
>>> >> > -- Forwarded message --
>>> >> > From: Koert Kuipers 
>>> >> > Date: Mon, Mar 23, 2015 at 4:36 PM
>>> >> > Subject: hadoop input/output format advanced control
>>> >> > To: "u...@spark.apache.org" 
>>> >> >
>>> >> >
>>> >> > currently its pretty hard to control the Hadoop Input/Output formats
>>> >> > used
>>> >> > in Spark. The conventions seems to be to add extra parameters to all
>>> >> > methods and then somewhere deep inside the code (for example in
>>> >> > PairRDDFunctions.saveAsHadoopFile) all these parameters get
>>> >> > translated
>>> >> into
>>> >> > settings on the Hadoop Configuration object.
>>> >> >
>>> >> > for example for compression i see "codec: Option[Class[_ <:
>>> >> > CompressionCodec]] = None" added to a bunch of methods.
>>> >> >
>>> >> > how scalable is this solution really?
>>> >> >
>>> >> > for example i need to read from a hadoop dataset and i dont want the
>>> >> input
>>> >> > (part) files to get split up. the way to do this is to set
>>> >> > "mapred.min.split.size". now i dont want to set this at the level of
>>> >> > the
>>> >> > SparkContext (which can be done), since i dont want it to apply to
>>> >> > input
>>> >> > formats in general. i want it to apply to just this one specific
>>> >> > input
>>> >> > dataset i need to read. which leaves me with no options currently. i
>>> >> could
>>> >> > go add yet another input parameter to all the methods
>>> >> > (SparkContext.textFile, SparkContext.hadoopFile,
>>> >> > SparkContext.objectFile,
>>> >> > etc.). but that seems ineffective.
>>> >> >
>>> >> > why can we n

Re: Understanding shuffle file name conflicts

2015-03-24 Thread Kannan Rajah
Saisai,
This is the not the case when I use spark-submit to run 2 jobs, one after
another. The shuffle id remains the same.


--
Kannan

On Tue, Mar 24, 2015 at 7:35 PM, Saisai Shao  wrote:

> Hi Kannan,
>
> As I know the shuffle Id in ShuffleDependency will be increased, so even
> if you run the same job twice, the shuffle dependency as well as shuffle id
> is different, so the shuffle file name which is combined by
> (shuffleId+mapId+reduceId) will be changed, so there's no name conflict
> even in the same directory as I know.
>
> Thanks
> Jerry
>
>
> 2015-03-25 1:56 GMT+08:00 Kannan Rajah :
>
>> I am working on SPARK-1529. I ran into an issue with my change, where the
>> same shuffle file was being reused across 2 jobs. Please note this only
>> happens when I use a hard coded location to use for shuffle files, say
>> "/tmp". It does not happen with normal code path that uses
>> DiskBlockManager
>> to pick different directories for each run. So I want to understand how
>> DiskBlockManager guarantees that such a conflict will never happen.
>>
>> Let's say the shuffle block id has a value of shuffle_0_0_0. So the data
>> file name is shuffle_0_0_0.data and index file name is
>> shuffle_0_0_0.index.
>> If I run a spark job twice, one after another, these files get created
>> under different directories because of the hashing logic in
>> DiskBlockManager. But the hash is based off the file name, so how are we
>> sure that there won't be a conflict ever?
>>
>> --
>> Kannan
>>
>
>


Re: Understanding shuffle file name conflicts

2015-03-24 Thread Josh Rosen
Which version of Spark are you using?  What do you mean when you say that
you used a hardcoded location for shuffle files?

If you look at the current DiskBlockManager code, it looks like it will
create a per-application subdirectory in each of the local root directories.

Here's the call to create a subdirectory in each root dir:
https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L126

This call to Utils.createDirectory() should result in a fresh subdirectory
being created for just this application (note the use of random UUIDs, plus
the check to ensure that the directory doesn't already exist):
https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/util/Utils.scala#L273

So, although the filenames for shuffle files are not globally unique, their
full paths should be unique due to these unique per-application
subdirectories.  Have you observed an instance where this isn't the case?

- Josh

On Tue, Mar 24, 2015 at 11:04 PM, Kannan Rajah  wrote:

> Saisai,
> This is the not the case when I use spark-submit to run 2 jobs, one after
> another. The shuffle id remains the same.
>
>
> --
> Kannan
>
> On Tue, Mar 24, 2015 at 7:35 PM, Saisai Shao 
> wrote:
>
> > Hi Kannan,
> >
> > As I know the shuffle Id in ShuffleDependency will be increased, so even
> > if you run the same job twice, the shuffle dependency as well as shuffle
> id
> > is different, so the shuffle file name which is combined by
> > (shuffleId+mapId+reduceId) will be changed, so there's no name conflict
> > even in the same directory as I know.
> >
> > Thanks
> > Jerry
> >
> >
> > 2015-03-25 1:56 GMT+08:00 Kannan Rajah :
> >
> >> I am working on SPARK-1529. I ran into an issue with my change, where
> the
> >> same shuffle file was being reused across 2 jobs. Please note this only
> >> happens when I use a hard coded location to use for shuffle files, say
> >> "/tmp". It does not happen with normal code path that uses
> >> DiskBlockManager
> >> to pick different directories for each run. So I want to understand how
> >> DiskBlockManager guarantees that such a conflict will never happen.
> >>
> >> Let's say the shuffle block id has a value of shuffle_0_0_0. So the data
> >> file name is shuffle_0_0_0.data and index file name is
> >> shuffle_0_0_0.index.
> >> If I run a spark job twice, one after another, these files get created
> >> under different directories because of the hashing logic in
> >> DiskBlockManager. But the hash is based off the file name, so how are we
> >> sure that there won't be a conflict ever?
> >>
> >> --
> >> Kannan
> >>
> >
> >
>


Re: Understanding shuffle file name conflicts

2015-03-24 Thread Saisai Shao
Yes as Josh said, when application is started, Spark will create a unique
application-wide folder for related temporary files. And jobs in this
application will have a unique shuffle id with unique file names, so
shuffle stages within app will not meet name conflicts.

Also shuffle files between applications are separated by application
folder, so the name conflicts cannot be happened.

Maybe you changed some parts of the code while do the patch.

Thanks
Jerry


2015-03-25 14:22 GMT+08:00 Josh Rosen :

> Which version of Spark are you using?  What do you mean when you say that
> you used a hardcoded location for shuffle files?
>
> If you look at the current DiskBlockManager code, it looks like it will
> create a per-application subdirectory in each of the local root directories.
>
> Here's the call to create a subdirectory in each root dir:
> https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L126
>
> This call to Utils.createDirectory() should result in a fresh subdirectory
> being created for just this application (note the use of random UUIDs, plus
> the check to ensure that the directory doesn't already exist):
>
> https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/util/Utils.scala#L273
>
> So, although the filenames for shuffle files are not globally unique,
> their full paths should be unique due to these unique per-application
> subdirectories.  Have you observed an instance where this isn't the case?
>
> - Josh
>
> On Tue, Mar 24, 2015 at 11:04 PM, Kannan Rajah 
> wrote:
>
>> Saisai,
>> This is the not the case when I use spark-submit to run 2 jobs, one after
>> another. The shuffle id remains the same.
>>
>>
>> --
>> Kannan
>>
>> On Tue, Mar 24, 2015 at 7:35 PM, Saisai Shao 
>> wrote:
>>
>> > Hi Kannan,
>> >
>> > As I know the shuffle Id in ShuffleDependency will be increased, so even
>> > if you run the same job twice, the shuffle dependency as well as
>> shuffle id
>> > is different, so the shuffle file name which is combined by
>> > (shuffleId+mapId+reduceId) will be changed, so there's no name conflict
>> > even in the same directory as I know.
>> >
>> > Thanks
>> > Jerry
>> >
>> >
>> > 2015-03-25 1:56 GMT+08:00 Kannan Rajah :
>> >
>> >> I am working on SPARK-1529. I ran into an issue with my change, where
>> the
>> >> same shuffle file was being reused across 2 jobs. Please note this only
>> >> happens when I use a hard coded location to use for shuffle files, say
>> >> "/tmp". It does not happen with normal code path that uses
>> >> DiskBlockManager
>> >> to pick different directories for each run. So I want to understand how
>> >> DiskBlockManager guarantees that such a conflict will never happen.
>> >>
>> >> Let's say the shuffle block id has a value of shuffle_0_0_0. So the
>> data
>> >> file name is shuffle_0_0_0.data and index file name is
>> >> shuffle_0_0_0.index.
>> >> If I run a spark job twice, one after another, these files get created
>> >> under different directories because of the hashing logic in
>> >> DiskBlockManager. But the hash is based off the file name, so how are
>> we
>> >> sure that there won't be a conflict ever?
>> >>
>> >> --
>> >> Kannan
>> >>
>> >
>> >
>>
>
>