答复: Welcome Zhenhua Wang as a Spark committer

2018-04-02 Thread wangzhenhua (G)

Thanks everyone! It’s my great pleasure to be part of such a professional and 
innovative community!


best regards,
-Zhenhua(Xander)




答复: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path

2017-09-07 Thread wangzhenhua (G)
+1 (non-binding)  Great to see data source API is going to be improved!

best regards,
-Zhenhua(Xander)

发件人: Dongjoon Hyun [mailto:dongjoon.h...@gmail.com]
发送时间: 2017年9月8日 4:07
收件人: 蒋星博
抄送: Michael Armbrust; Reynold Xin; Andrew Ash; Herman van Hövell tot 
Westerflier; Ryan Blue; Spark dev list; Suresh Thalamati; Wenchen Fan
主题: Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path

+1 (non-binding).

On Thu, Sep 7, 2017 at 12:46 PM, 蒋星博 
> wrote:
+1


Reynold Xin >于2017年9月7日 
周四下午12:04写道:
+1 as well

On Thu, Sep 7, 2017 at 9:12 PM, Michael Armbrust 
> wrote:
+1

On Thu, Sep 7, 2017 at 9:32 AM, Ryan Blue 
> wrote:

+1 (non-binding)

Thanks for making the updates reflected in the current PR. It would be great to 
see the doc updated before it is finally published though.

Right now it feels like this SPIP is focused more on getting the basics right 
for what many datasources are already doing in API V1 combined with other 
private APIs, vs pushing forward state of the art for performance.

I think that’s the right approach for this SPIP. We can add the support you’re 
talking about later with a more specific plan that doesn’t block fixing the 
problems that this addresses.
​

On Thu, Sep 7, 2017 at 2:00 AM, Herman van Hövell tot Westerflier 
> wrote:
+1 (binding)

I personally believe that there is quite a big difference between having a 
generic data source interface with a low surface area and pushing down a 
significant part of query processing into a datasource. The later has much 
wider wider surface area and will require us to stabilize most of the internal 
catalyst API's which will be a significant burden on the community to maintain 
and has the potential to slow development velocity significantly. If you want 
to write such integrations then you should be prepared to work with catalyst 
internals and own up to the fact that things might change across minor versions 
(and in some cases even maintenance releases). If you are willing to go down 
that road, then your best bet is to use the already existing spark session 
extensions which will allow you to write such integrations and can be used as 
an `escape hatch`.


On Thu, Sep 7, 2017 at 10:23 AM, Andrew Ash 
> wrote:
+0 (non-binding)

I think there are benefits to unifying all the Spark-internal datasources into 
a common public API for sure.  It will serve as a forcing function to ensure 
that those internal datasources aren't advantaged vs datasources developed 
externally as plugins to Spark, and that all Spark features are available to 
all datasources.

But I also think this read-path proposal avoids the more difficult questions 
around how to continue pushing datasource performance forwards.  James Baker 
(my colleague) had a number of questions about advanced pushdowns (combined 
sorting and filtering), and Reynold also noted that pushdown of aggregates and 
joins are desirable on longer timeframes as well.  The Spark community saw 
similar requests, for aggregate pushdown in SPARK-12686, join pushdown in 
SPARK-20259, and arbitrary plan pushdown in SPARK-12449.  Clearly a number of 
people are interested in this kind of performance work for datasources.

To leave enough space for datasource developers to continue experimenting with 
advanced interactions between Spark and their datasources, I'd propose we leave 
some sort of escape valve that enables these datasources to keep pushing the 
boundaries without forking Spark.  Possibly that looks like an additional 
unsupported/unstable interface that pushes down an entire (unstable API) 
logical plan, which is expected to break API on every release.   (Spark 
attempts this full-plan pushdown, and if that fails Spark ignores it and 
continues on with the rest of the V2 API for compatibility).  Or maybe it looks 
like something else that we don't know of yet.  Possibly this falls outside of 
the desired goals for the V2 API and instead should be a separate SPIP.

If we had a plan for this kind of escape valve for advanced datasource 
developers I'd be an unequivocal +1.  Right now it feels like this SPIP is 
focused more on getting the basics right for what many datasources are already 
doing in API V1 combined with other private APIs, vs pushing forward state of 
the art for performance.

Andrew

On Wed, Sep 6, 2017 at 10:56 PM, Suresh Thalamati 
> wrote:
+1 (non-binding)


On Sep 6, 2017, at 7:29 PM, Wenchen Fan 
> wrote:

Hi all,

In the previous discussion, we decided to split the read and write path of data 
source v2 into 2 SPIPs, and I'm sending this 

答复: Limit Query Performance Suggestion

2017-01-18 Thread wangzhenhua (G)
How about this:
1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new 
partitioner to uniformly dispatch the data

class LimitUniformPartitioner(partitions: Int) extends Partitioner {

  def numPartitions: Int = partitions
  
  var num = 0

  def getPartition(key: Any): Int = {
num = num + 1
num % partitions
  }

  override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
  h.numPartitions == numPartitions
case _ =>
  false
  }

  override def hashCode: Int = numPartitions
}

2. then in GlobalLimit, we only take the first 
limit_number/num_of_shufflepartitions elements in each partition.

One issue left is how to decide shuffle partition number. 
We can have a config of the maximum number of elements for each GlobalLimit 
task to process,
then do a factorization to get a number most close to that config.
E.g. the config is 2000:
if limit=1,  1 = 2000 * 5, we shuffle to 5 partitions
if limit=,  =  * 9, we shuffle to 9 partitions
if limit is a prime number, we just fall back to single partition

best regards,
-zhenhua


-邮件原件-
发件人: Liang-Chi Hsieh [mailto:vii...@gmail.com] 
发送时间: 2017年1月18日 15:48
收件人: dev@spark.apache.org
主题: Re: Limit Query Performance Suggestion


Hi Sujith,

I saw your updated post. Seems it makes sense to me now.

If you use a very big limit number, the shuffling before `GlobalLimit` would be 
a bottleneck for performance, of course, even it can eventually shuffle enough 
data to the single partition.

Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit` must 
shuffle all limited data from all partitions to one single machine with respect 
to query execution. In other words, I think we can avoid shuffling data in 
`GlobalLimit`.

I have an idea to improve this and may update here later if I can make it work.


sujith71955 wrote
> Dear Liang,
> 
> Thanks for your valuable feedback.
> 
> There was a mistake in the previous post  i corrected it, as you 
> mentioned the  `GlobalLimit` we will only take the required number of 
> rows from the input iterator which really pulls data from local blocks 
> and remote blocks.
> but if the limit value is very high >= 1000,  and when there will 
> be a shuffle exchange happens  between `GlobalLimit` and `LocalLimit` 
> to retrieve data from all partitions to one partition, since the limit 
> value is very large the performance bottleneck still exists.
>  
> soon in next  post i will publish a test report with sample data and 
> also figuring out a solution for this problem.
> 
> Please let me know for any clarifications or suggestions regarding 
> this issue.
> 
> Regards,
> Sujith





-
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



timeout in shuffle problem

2016-01-24 Thread wangzhenhua (G)
Hi,

I have a problem of time out in shuffle, it happened after shuffle write and at 
the start of shuffle read,
logs on driver and executors are shown as below. Spark version is 1.5. Looking 
forward to your replys. Thanks!

logs on driver only have warnings:
WARN TaskSetManager: Lost task 38.0 in stage 27.0 (TID 127459, linux-162): 
FetchFailed(BlockManagerId(66, 172.168.100.12, 23028), shuffleId=9, mapId=55, 
reduceId=38, message=
org.apache.spark.shuffle.FetchFailedException: 
java.util.concurrent.TimeoutException: Timeout waiting for task.
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:173)
at 
org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$executePartition$1(sort.scala:160)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169)
at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at