答复: Welcome Zhenhua Wang as a Spark committer
Thanks everyone! It’s my great pleasure to be part of such a professional and innovative community! best regards, -Zhenhua(Xander)
答复: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path
+1 (non-binding) Great to see data source API is going to be improved! best regards, -Zhenhua(Xander) 发件人: Dongjoon Hyun [mailto:dongjoon.h...@gmail.com] 发送时间: 2017年9月8日 4:07 收件人: 蒋星博 抄送: Michael Armbrust; Reynold Xin; Andrew Ash; Herman van Hövell tot Westerflier; Ryan Blue; Spark dev list; Suresh Thalamati; Wenchen Fan 主题: Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path +1 (non-binding). On Thu, Sep 7, 2017 at 12:46 PM, 蒋星博> wrote: +1 Reynold Xin >于2017年9月7日 周四下午12:04写道: +1 as well On Thu, Sep 7, 2017 at 9:12 PM, Michael Armbrust > wrote: +1 On Thu, Sep 7, 2017 at 9:32 AM, Ryan Blue > wrote: +1 (non-binding) Thanks for making the updates reflected in the current PR. It would be great to see the doc updated before it is finally published though. Right now it feels like this SPIP is focused more on getting the basics right for what many datasources are already doing in API V1 combined with other private APIs, vs pushing forward state of the art for performance. I think that’s the right approach for this SPIP. We can add the support you’re talking about later with a more specific plan that doesn’t block fixing the problems that this addresses. On Thu, Sep 7, 2017 at 2:00 AM, Herman van Hövell tot Westerflier > wrote: +1 (binding) I personally believe that there is quite a big difference between having a generic data source interface with a low surface area and pushing down a significant part of query processing into a datasource. The later has much wider wider surface area and will require us to stabilize most of the internal catalyst API's which will be a significant burden on the community to maintain and has the potential to slow development velocity significantly. If you want to write such integrations then you should be prepared to work with catalyst internals and own up to the fact that things might change across minor versions (and in some cases even maintenance releases). If you are willing to go down that road, then your best bet is to use the already existing spark session extensions which will allow you to write such integrations and can be used as an `escape hatch`. On Thu, Sep 7, 2017 at 10:23 AM, Andrew Ash > wrote: +0 (non-binding) I think there are benefits to unifying all the Spark-internal datasources into a common public API for sure. It will serve as a forcing function to ensure that those internal datasources aren't advantaged vs datasources developed externally as plugins to Spark, and that all Spark features are available to all datasources. But I also think this read-path proposal avoids the more difficult questions around how to continue pushing datasource performance forwards. James Baker (my colleague) had a number of questions about advanced pushdowns (combined sorting and filtering), and Reynold also noted that pushdown of aggregates and joins are desirable on longer timeframes as well. The Spark community saw similar requests, for aggregate pushdown in SPARK-12686, join pushdown in SPARK-20259, and arbitrary plan pushdown in SPARK-12449. Clearly a number of people are interested in this kind of performance work for datasources. To leave enough space for datasource developers to continue experimenting with advanced interactions between Spark and their datasources, I'd propose we leave some sort of escape valve that enables these datasources to keep pushing the boundaries without forking Spark. Possibly that looks like an additional unsupported/unstable interface that pushes down an entire (unstable API) logical plan, which is expected to break API on every release. (Spark attempts this full-plan pushdown, and if that fails Spark ignores it and continues on with the rest of the V2 API for compatibility). Or maybe it looks like something else that we don't know of yet. Possibly this falls outside of the desired goals for the V2 API and instead should be a separate SPIP. If we had a plan for this kind of escape valve for advanced datasource developers I'd be an unequivocal +1. Right now it feels like this SPIP is focused more on getting the basics right for what many datasources are already doing in API V1 combined with other private APIs, vs pushing forward state of the art for performance. Andrew On Wed, Sep 6, 2017 at 10:56 PM, Suresh Thalamati > wrote: +1 (non-binding) On Sep 6, 2017, at 7:29 PM, Wenchen Fan > wrote: Hi all, In the previous discussion, we decided to split the read and write path of data source v2 into 2 SPIPs, and I'm sending this
答复: Limit Query Performance Suggestion
How about this: 1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new partitioner to uniformly dispatch the data class LimitUniformPartitioner(partitions: Int) extends Partitioner { def numPartitions: Int = partitions var num = 0 def getPartition(key: Any): Int = { num = num + 1 num % partitions } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions } 2. then in GlobalLimit, we only take the first limit_number/num_of_shufflepartitions elements in each partition. One issue left is how to decide shuffle partition number. We can have a config of the maximum number of elements for each GlobalLimit task to process, then do a factorization to get a number most close to that config. E.g. the config is 2000: if limit=1, 1 = 2000 * 5, we shuffle to 5 partitions if limit=, = * 9, we shuffle to 9 partitions if limit is a prime number, we just fall back to single partition best regards, -zhenhua -邮件原件- 发件人: Liang-Chi Hsieh [mailto:vii...@gmail.com] 发送时间: 2017年1月18日 15:48 收件人: dev@spark.apache.org 主题: Re: Limit Query Performance Suggestion Hi Sujith, I saw your updated post. Seems it makes sense to me now. If you use a very big limit number, the shuffling before `GlobalLimit` would be a bottleneck for performance, of course, even it can eventually shuffle enough data to the single partition. Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit` must shuffle all limited data from all partitions to one single machine with respect to query execution. In other words, I think we can avoid shuffling data in `GlobalLimit`. I have an idea to improve this and may update here later if I can make it work. sujith71955 wrote > Dear Liang, > > Thanks for your valuable feedback. > > There was a mistake in the previous post i corrected it, as you > mentioned the `GlobalLimit` we will only take the required number of > rows from the input iterator which really pulls data from local blocks > and remote blocks. > but if the limit value is very high >= 1000, and when there will > be a shuffle exchange happens between `GlobalLimit` and `LocalLimit` > to retrieve data from all partitions to one partition, since the limit > value is very large the performance bottleneck still exists. > > soon in next post i will publish a test report with sample data and > also figuring out a solution for this problem. > > Please let me know for any clarifications or suggestions regarding > this issue. > > Regards, > Sujith - Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
timeout in shuffle problem
Hi, I have a problem of time out in shuffle, it happened after shuffle write and at the start of shuffle read, logs on driver and executors are shown as below. Spark version is 1.5. Looking forward to your replys. Thanks! logs on driver only have warnings: WARN TaskSetManager: Lost task 38.0 in stage 27.0 (TID 127459, linux-162): FetchFailed(BlockManagerId(66, 172.168.100.12, 23028), shuffleId=9, mapId=55, reduceId=38, message= org.apache.spark.shuffle.FetchFailedException: java.util.concurrent.TimeoutException: Timeout waiting for task. at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:306) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:173) at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$executePartition$1(sort.scala:160) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at