Re: orc read issue n spark

2015-11-18 Thread Reynold Xin
What do you mean by starts delay scheduling? Are you saying it is no longer
doing local reads?

If that's the case you can increase the spark.locality.read timeout.

On Wednesday, November 18, 2015, Renu Yadav  wrote:

> Hi ,
> I am using spark 1.4.1 and saving orc file using
> df.write.format("orc").save("outputlocation")
>
> outputloation size 440GB
>
> and while reading df.read.format("orc").load("outputlocation").count
>
>
> it has 2618 partitions .
> the count operation runs fine uptil 2500 but starts delay scheduling after
> that which results in slow performance.
>
> *If anyone has any idea on this.Please do reply as I need this  very
> urgent*
>
> Thanks in advance
>
>
> Regards,
> Renu Yadav
>
>
>


Re: How to Add builtin geometry type to SparkSQL?

2015-11-18 Thread Reynold Xin
Have you looked into https://github.com/harsha2010/magellan ?

On Wednesday, November 18, 2015, ddcd  wrote:

> Hi all,
>
> I'm considering adding geometry type to SparkSQL.
>
> I know that there is a project named  sparkGIS
>   which is an add-on of sparkSQL. The
> project uses user defined types and user defined functions.
>
> But I think a built-in type will be better in that we can make more
> optimization. I'm reading the code of sparkSQL but it's really difficult
> for
> me. Is there any document that helps?
>
> Thank you!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-Add-builtin-geometry-type-to-SparkSQL-tp15265.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
> For additional commands, e-mail: dev-h...@spark.apache.org 
>
>


Re: Support for local disk columnar storage for DataFrames

2015-11-18 Thread Cristian O
Hi,

While these OSS efforts are interesting, they're for now quite unproven.
Personally would be much more interested in seeing Spark incrementally
moving towards supporting updating DataFrames on various storage
substrates, and first of all locally, perhaps as an extension of cached
DataFrames.

However before we get full blown update support, I would suggest two
enhancements that are fairly straightforward with the current design. If
they make sense please let me know and I'll add them as Jiras:

1. Checkpoint support for DataFrames - as mentioned this can be as simple
as saving to a parquet file or some other format, but would not require
re-reading the file to alter the lineage, and would also prune the logical
plan. Alternatively checkpointing a cached DataFrame can delegate to
checkpointing the underlying RDD but again needs to prune the logical plan.

2. Efficient transformation of cached DataFrames to cached DataFrames - an
efficient copy-on-write mechanism can be used to avoid unpacking
CachedBatches (row groups) into InternalRows when building a cached
DataFrame out of a source cached DataFrame through transformations (like an
outer join) that only affect a small subset of rows. Statistics and
partitioning information can be used to determine which row groups are
affected and which can be copied *by reference* unchanged. This would
effectively allow performing immutable updates of cached DataFrames in
scenarios like Streaming or other iterative use cases like ML.

Thanks,
Cristian



On 16 November 2015 at 08:30, Mark Hamstra  wrote:

> FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB
>
> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath  > wrote:
>
>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
>> input/output format support:
>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>>
>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin  wrote:
>>
>>> This (updates) is something we are going to think about in the next
>>> release or two.
>>>
>>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>>> cristian.b.op...@googlemail.com> wrote:
>>>
 Sorry, apparently only replied to Reynold, meant to copy the list as
 well, so I'm self replying and taking the opportunity to illustrate with an
 example.

 Basically I want to conceptually do this:

 val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => 
 (i, 1)).toDF("k", "v")
 val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i 
 => (i, 1)).toDF("k", "v")

 bigDf.cache()

 bigDf.registerTempTable("big")
 deltaDf.registerTempTable("delta")

 val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 
 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")

 newBigDf.cache()
 bigDf.unpersist()


 This is essentially an update of keys "1" and "5" only, in a
 dataset of 1 million keys.

 This can be achieved efficiently if the join would preserve the cached
 blocks that have been unaffected, and only copy and mutate the 2 affected
 blocks corresponding to the matching join keys.

 Statistics can determine which blocks actually need mutating. Note also
 that shuffling is not required assuming both dataframes are pre-partitioned
 by the same key K.

 In SQL this could actually be expressed as an UPDATE statement or for a
 more generalized use as a MERGE UPDATE:
 https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx

 While this may seem like a very special case optimization, it would
 effectively implement UPDATE support for cached DataFrames, for both
 optimal and non-optimal usage.

 I appreciate there's quite a lot here, so thank you for taking the time
 to consider it.

 Cristian



 On 12 November 2015 at 15:49, Cristian O <
 cristian.b.op...@googlemail.com> wrote:

> Hi Reynold,
>
> Thanks for your reply.
>
> Parquet may very well be used as the underlying implementation, but
> this is more than about a particular storage representation.
>
> There are a few things here that are inter-related and open different
> possibilities, so it's hard to structure, but I'll give it a try:
>
> 1. Checkpointing DataFrames - while a DF can be saved locally as
> parquet, just using that as a checkpoint would currently require 
> explicitly
> reading it back. A proper checkpoint implementation would just save
> (perhaps asynchronously) and prune the logical plan while allowing to
> continue using the same DF, now backed by the checkpoint.
>
> It's important to prune the logical plan to avoid all kinds of issues
> that may arise from unbounded 

Spark Summit East 2016 CFP - Closing in 5 days

2015-11-18 Thread Scott walent
Hi Spark Devs and Users,

The CFP for Spark Summit East 2016 (https://spark-summit.org) is closing
this weekend. As the leading event for Apache Spark, this is the chance to
both share key learnings and to gain insights from the creators of Spark,
developers, vendors and peers who are using Spark.

We are looking for presenters who would like to showcase how Spark and its
related technologies are used in a variety of ways, including Applications,
Developer, Research, Data Science and our new in 2016 track, Enterprise.

Don’t wait! The call for presentations for Spark Summit East closes in less
than a week, November 22nd at 11:59 pm PST. Please visit our submission
page (https://spark-summit.org/east-2016/) for additional details.

Regards,
Spark Summit Organizers


How to Add builtin geometry type to SparkSQL?

2015-11-18 Thread ddcd
Hi all,

I'm considering adding geometry type to SparkSQL. 

I know that there is a project named  sparkGIS
  which is an add-on of sparkSQL. The
project uses user defined types and user defined functions. 

But I think a built-in type will be better in that we can make more
optimization. I'm reading the code of sparkSQL but it's really difficult for
me. Is there any document that helps?

Thank you!




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-Add-builtin-geometry-type-to-SparkSQL-tp15265.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: A proposal for Spark 2.0

2015-11-18 Thread Mark Hamstra
Ah, got it; by "stabilize" you meant changing the API, not just bug
fixing.  We're on the same page now.

On Wed, Nov 18, 2015 at 3:39 PM, Kostas Sakellis 
wrote:

> A 1.6.x release will only fix bugs - we typically don't change APIs in z
> releases. The Dataset API is experimental and so we might be changing the
> APIs before we declare it stable. This is why I think it is important to
> first stabilize the Dataset API with a Spark 1.7 release before moving to
> Spark 2.0. This will benefit users that would like to use the new Dataset
> APIs but can't move to Spark 2.0 because of the backwards incompatible
> changes, like removal of deprecated APIs, Scala 2.11 etc.
>
> Kostas
>
>
> On Fri, Nov 13, 2015 at 12:26 PM, Mark Hamstra 
> wrote:
>
>> Why does stabilization of those two features require a 1.7 release
>> instead of 1.6.1?
>>
>> On Fri, Nov 13, 2015 at 11:40 AM, Kostas Sakellis 
>> wrote:
>>
>>> We have veered off the topic of Spark 2.0 a little bit here - yes we can
>>> talk about RDD vs. DS/DF more but lets refocus on Spark 2.0. I'd like to
>>> propose we have one more 1.x release after Spark 1.6. This will allow us to
>>> stabilize a few of the new features that were added in 1.6:
>>>
>>> 1) the experimental Datasets API
>>> 2) the new unified memory manager.
>>>
>>> I understand our goal for Spark 2.0 is to offer an easy transition but
>>> there will be users that won't be able to seamlessly upgrade given what we
>>> have discussed as in scope for 2.0. For these users, having a 1.x release
>>> with these new features/APIs stabilized will be very beneficial. This might
>>> make Spark 1.7 a lighter release but that is not necessarily a bad thing.
>>>
>>> Any thoughts on this timeline?
>>>
>>> Kostas Sakellis
>>>
>>>
>>>
>>> On Thu, Nov 12, 2015 at 8:39 PM, Cheng, Hao  wrote:
>>>
 Agree, more features/apis/optimization need to be added in DF/DS.



 I mean, we need to think about what kind of RDD APIs we have to provide
 to developer, maybe the fundamental API is enough, like, the ShuffledRDD
 etc..  But PairRDDFunctions probably not in this category, as we can do the
 same thing easily with DF/DS, even better performance.



 *From:* Mark Hamstra [mailto:m...@clearstorydata.com]
 *Sent:* Friday, November 13, 2015 11:23 AM
 *To:* Stephen Boesch

 *Cc:* dev@spark.apache.org
 *Subject:* Re: A proposal for Spark 2.0



 Hmmm... to me, that seems like precisely the kind of thing that argues
 for retaining the RDD API but not as the first thing presented to new Spark
 developers: "Here's how to use groupBy with DataFrames Until the
 optimizer is more fully developed, that won't always get you the best
 performance that can be obtained.  In these particular circumstances, ...,
 you may want to use the low-level RDD API while setting
 preservesPartitioning to true.  Like this"



 On Thu, Nov 12, 2015 at 7:05 PM, Stephen Boesch 
 wrote:

 My understanding is that  the RDD's presently have more support for
 complete control of partitioning which is a key consideration at scale.
 While partitioning control is still piecemeal in  DF/DS  it would seem
 premature to make RDD's a second-tier approach to spark dev.



 An example is the use of groupBy when we know that the source relation
 (/RDD) is already partitioned on the grouping expressions.  AFAIK the spark
 sql still does not allow that knowledge to be applied to the optimizer - so
 a full shuffle will be performed. However in the native RDD we can use
 preservesPartitioning=true.



 2015-11-12 17:42 GMT-08:00 Mark Hamstra :

 The place of the RDD API in 2.0 is also something I've been wondering
 about.  I think it may be going too far to deprecate it, but changing
 emphasis is something that we might consider.  The RDD API came well before
 DataFrames and DataSets, so programming guides, introductory how-to
 articles and the like have, to this point, also tended to emphasize RDDs --
 or at least to deal with them early.  What I'm thinking is that with 2.0
 maybe we should overhaul all the documentation to de-emphasize and
 reposition RDDs.  In this scheme, DataFrames and DataSets would be
 introduced and fully addressed before RDDs.  They would be presented as the
 normal/default/standard way to do things in Spark.  RDDs, in contrast,
 would be presented later as a kind of lower-level, closer-to-the-metal API
 that can be used in atypical, more specialized contexts where DataFrames or
 DataSets don't fully fit.



 On Thu, Nov 12, 2015 at 5:17 PM, Cheng, Hao 
 wrote:

 I am not sure what the best practice for this 

RE: SequenceFile and object reuse

2015-11-18 Thread jeff saremi
You're not seeing the issue because you perform one additional "map". 
map{case (k,v) => (k.get(), v.toString)}Instead of being able to use the read 
Text you had to create a tuple (single) out of the string of the text.

That is exactly why I asked this question.Why do we have t do this additional 
processing? What is the rationale behind it?
Is there other ways of reading a hadoop file (or any other file) that would not 
incur this additional step?thanks

Date: Thu, 19 Nov 2015 13:26:31 +0800
Subject: Re: FW: SequenceFile and object reuse
From: zjf...@gmail.com
To: jeffsar...@hotmail.com
CC: dev@spark.apache.org

Would this be an issue on the raw data ? I use the following simple code, and 
don't hit the issue you mentioned. Or it would be better to share your code. 
val rdd =sc.sequenceFile("/Users/hadoop/Temp/Seq", classOf[IntWritable], 
classOf[Text])
rdd.map{case (k,v) => (k.get(), v.toString)}.collect() foreach println
On Thu, Nov 19, 2015 at 12:04 PM, jeff saremi  wrote:



I sent this to the user forum. I got no responses. Could someone here please 
help? thanks
jeff

From: jeffsar...@hotmail.com
To: u...@spark.apache.org
Subject: SequenceFile and object reuse
Date: Fri, 13 Nov 2015 13:29:58 -0500




So we tried reading a sequencefile in Spark and realized that all our records 
have ended up becoming the same.
THen one of us found this:

Note: Because Hadoop's RecordReader class re-uses the same Writable object for 
each record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle operation will create many references to the same 
object. If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first copy them using a map function.

Is there anyone that can shed some light on this bizzare behavior and the 
decisions behind it?
And I also would like to know if anyone's able to read a binary file and not to 
incur the additional map() as suggested by the above? What format did you use?

thanksJeff  
  


-- 
Best Regards

Jeff Zhang
  

FW: SequenceFile and object reuse

2015-11-18 Thread jeff saremi
I sent this to the user forum. I got no responses. Could someone here please 
help? thanks
jeff

From: jeffsar...@hotmail.com
To: u...@spark.apache.org
Subject: SequenceFile and object reuse
Date: Fri, 13 Nov 2015 13:29:58 -0500




So we tried reading a sequencefile in Spark and realized that all our records 
have ended up becoming the same.
THen one of us found this:

Note: Because Hadoop's RecordReader class re-uses the same Writable object for 
each record, directly caching the returned RDD or directly passing it to an 
aggregation or shuffle operation will create many references to the same 
object. If you plan to directly cache, sort, or aggregate Hadoop writable 
objects, you should first copy them using a map function.

Is there anyone that can shed some light on this bizzare behavior and the 
decisions behind it?
And I also would like to know if anyone's able to read a binary file and not to 
incur the additional map() as suggested by the above? What format did you use?

thanksJeff  
  

Hash Partitioning & Sort Merge Join

2015-11-18 Thread gsvic
In case of Sort Merge join in which a shuffle (exchange) will be performed, I
have the following questions (Please correct me if my understanding is not
correct): 

Let's say that relation A is a JSONRelation (640 MB) on the HDFS where the
block size is 64MB. This will produce a Scan JSONRelation() of 10 partitions
( 640 / 64 ) where each of these partitions will contain |A| / 10 rows. 

The second step will be a hashPartitioning(#key, 200) where #key is the
equi-join condition and 200 the default number of shuffles
(spark.sql.shuffle.partitions). Each partition will be computed in an
individual task, in which every row will be hashed on the #key and then will
be written in the corresponing chunk (of 200 resulting chunks) directly on
disk. 

Q1: What happens if a resulting hashed row in the executor A must be written
in a chunk which is stored in the executor B? Does it use the
HashShuffleManager to transfer it over the network? 

Q2: After the Sort (3rd) step there will be 200, 200 resulting
partitions/chunks for relations A and B respectively which will be
concatenated into 200 SortMergeJoin tasks where each of them will contain
(|A|/200 + |B|/200) rows. For each pair (chunkOfA, chunkOfB) will chunkOfA
and chunkOfB contain rows for the same hash key ? 

Q3: In the SortMergeJoin of Q2, I suppose that each of the 200 SortMergeJoin
tasks joins two partitions/chunks with the same hash key. So, if a task
corresponds to a hash key X, does it use ShuffleBlockFetchIterator to fetch
the two Shuffles/Chunks (of relations A and B) with hash key X?

Q4: Which sorting algorithm is being used?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Hash-Partitioning-Sort-Merge-Join-tp15275.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: FW: SequenceFile and object reuse

2015-11-18 Thread Jeff Zhang
Would this be an issue on the raw data ? I use the following simple code,
and don't hit the issue you mentioned. Or it would be better to share your
code.

val rdd =sc.sequenceFile("/Users/hadoop/Temp/Seq",
classOf[IntWritable], classOf[Text])
rdd.map{case (k,v) => (k.get(), v.toString)}.collect() foreach println


On Thu, Nov 19, 2015 at 12:04 PM, jeff saremi 
wrote:

> I sent this to the user forum. I got no responses. Could someone here
> please help? thanks
> jeff
>
> --
> From: jeffsar...@hotmail.com
> To: u...@spark.apache.org
> Subject: SequenceFile and object reuse
> Date: Fri, 13 Nov 2015 13:29:58 -0500
>
>
> So we tried reading a sequencefile in Spark and realized that all our
> records have ended up becoming the same.
> THen one of us found this:
>
> Note: Because Hadoop's RecordReader class re-uses the same Writable object
> for each record, directly caching the returned RDD or directly passing it
> to an aggregation or shuffle operation will create many references to the
> same object. If you plan to directly cache, sort, or aggregate Hadoop
> writable objects, you should first copy them using a map function.
>
> Is there anyone that can shed some light on this bizzare behavior and the
> decisions behind it?
> And I also would like to know if anyone's able to read a binary file and
> not to incur the additional map() as suggested by the above? What format
> did you use?
>
> thanks
> Jeff
>



-- 
Best Regards

Jeff Zhang