[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239995006 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private lazy val writeMetrics = +SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private lazy val readMetrics = +SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) --- End diff -- Yea that was done and revert in https://github.com/apache/spark/pull/23207/commits/7d104ebe854effb3d8ceb63cd408b9749cee1a8a, will separate to another pr after this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239748512 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a --- End diff -- Ah, I think I know your meaning, yea, after we passing context, more things can be done in this interface, I'll delete this comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239744840 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = { --- End diff -- Yea, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239744767 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" --- End diff -- Thanks, there's no back and forth, thanks for your advise and help all along Wenchen. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239743452 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a --- End diff -- Not stale, maybe I didn't express clearly, here I want to express is we always return a proxy reporter like currently SQLShuffleWriteReporter, it's not only update self metrics(local accumulator) but also the exists reporter passing in(like metrics in context). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 ``` the code looks much cleaner now! ``` Sorry for the original rush and code, I should and will pay more attention on coding clean and more discussion on optional implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239698500 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NS_TIMING_METRIC = "nanosecond" --- End diff -- How about naming it as `NORMALIZE_TIMING_METRIC`, maybe it can be reused later for other timing metric which need normalize unit. If you think its strange name I'll change back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239698273 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -333,8 +343,19 @@ object ShuffleExchangeExec { new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), -serializer) +serializer, +shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) dependency } + + /** + * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter + * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. + */ + def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { +(reporter: ShuffleWriteMetricsReporter) => { --- End diff -- Yes it can't work with Scala 2.11, should write in more readable, done in 6378a3d. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239698174 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter --- End diff -- Copy, the trait can be added when we need more customization for SQL shuffle. Done in 6378a3d. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 ``` Can we put the above in a closure and pass it into shuffle dependency? Then in SQL we just put the above in SQL using custom metrics. ``` Yea, the commit of a780b70 achieve this by adding `ShuffleWriteProcessor` abstract. And the read metrics rename reverted in 7d104eb, will do it and display change in another pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239548704 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) --- End diff -- Reimplement done in a780b70. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239312090 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))), + 1L -> (("Exchange", Map( +"shuffle records written" -> 2L, +"records read" -> 2L, +"local blocks fetched" -> 2L, --- End diff -- Copy, the display text will be done in another pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 ``` can you separate the prs to rename read side metric and the write side change? ``` No problem, next commit will revert the changes of rename read side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239311564 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) --- End diff -- As our discussion here https://github.com/apache/spark/pull/23207#discussion_r238909822, The latest approach choose to carry a function of (reporter => reporter) in shuffle dependency to create SQLShuffleWriteMetrics in ShuffleMapTask. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239311141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) --- End diff -- Both should be private lazy val(also newly added readMetrics), I'll change them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239311018 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + override lazy val metrics = --- End diff -- Thanks, make sense, I'll change to separate both read/write metrics and pass them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239069014 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { + @transient private[this] lazy val _bytesWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[this] lazy val _recordsWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[this] lazy val _writeTime = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { +metricsReporter.incBytesWritten(v) +_bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +metricsReporter.incRecordsWritten(v) +_recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { +metricsReporter.incWriteTime(v) +_writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" --- End diff -- Just this shuffle write time in this PR. The left one of time metrics is `fetch wait time`, it's in ms set in `ShuffleBlockFetcherIterator`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239067552 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -163,6 +171,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString + } else if (metricsType == NS_TIMING_METRIC) { +duration => Utils.msDurationToString(duration / 1000 / 1000) --- End diff -- Maybe it's ok, as I test this locally with UT in SQLMetricsSuites, result below: ``` shuffle records written: 2 shuffle write time total (min, med, max): 37 ms (37 ms, 37 ms, 37 ms) shuffle bytes written total (min, med, max): 66.0 B (66.0 B, 66.0 B, 66.0 ``` In the actual scenario the shuffle bytes written will be more larger, and keep the time to ms maybe enough, WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239054315 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -299,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value") // Assume the execution plan is - // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0) + // Project(nodeId = 0) + // +- ShuffledHashJoin(nodeId = 1) + // :- Exchange(nodeId = 2) + // : +- Project(nodeId = 3) + // : +- LocalTableScan(nodeId = 4) + // +- Exchange(nodeId = 5) + // +- Project(nodeId = 6) + // +- LocalTableScan(nodeId = 7) val df = df1.join(df2, "key") testSparkPlanMetrics(df, 1, Map( 1L -> (("ShuffledHashJoin", Map( "number of output rows" -> 2L, - "avg hash probe (min, med, max)" -> "\n(1, 1, 1)" + "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))), +2L -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), --- End diff -- For most scenario the answer is yes, but like sort merge join cases, 2 sort node reuse same child will make shuffle records written/records read different, I also add cases in here: https://github.com/xuanyuanking/spark/blob/SPARK-26193/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala#L217-L222 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239050549 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))), + 1L -> (("Exchange", Map( +"shuffle records written" -> 2L, +"records read" -> 2L, +"local blocks fetched" -> 2L, --- End diff -- I agree "fetch" is a more code name in `ShuffleBlockFetcherIterator`, but do you mean just change the display in ui? Cause there's many place even api.scala use the name `localBlocksFetched`, change them all maybe not a good choice for code backport, WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239049398 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -163,6 +171,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString + } else if (metricsType == NANO_TIMING_METRIC) { +duration => Utils.msDurationToString(duration / 10) --- End diff -- Sorry...Sorry for this, change it to `1000 / 1000` as other place do for safety. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239049121 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +78,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NANO_TIMING_METRIC = "nanosecond" --- End diff -- Done in cf35b9f. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239049030 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit } + + +/** + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleWriteMetricsReporter( +reporters: Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter { + override private[spark] def incBytesWritten(v: Long): Unit = { +reporters.foreach(_.incBytesWritten(v)) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +reporters.foreach(_.decRecordsWritten(v)) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +reporters.foreach(_.incRecordsWritten(v)) + } + override private[spark] def incWriteTime(v: Long): Unit = { +reporters.foreach(_.incWriteTime(v)) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +reporters.foreach(_.decBytesWritten(v)) + } +} + + +/** + * A proxy class of ShuffleReadMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleReadMetricsReporter( --- End diff -- Got it, thanks for your guidance, revert to old approach and just little changes for `SQLShuffleReadMetricsReporter` which followed https://github.com/apache/spark/pull/23147. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239048356 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit } + + +/** + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleWriteMetricsReporter( --- End diff -- Thanks for your guidance Reynold and Wenchen, I choose the second implementation, it takes account of both less heavy option and similar use patten as `SQLShuffleReadMetricsReporter`. Done in cf35b9f. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238732441 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L +// Register the shuffle write metrics reporter to shuffleWriteMetrics. +if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( --- End diff -- Cool! That's a more cleaner implementation on consistency for both read and write metrics reporter, also read metrics can extend `ShuffleReadMetricsReporter` directly. Done in ca6c407 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 Thanks for your reply Wenchen, there's a sketch doc assigned in JIRA:https://docs.google.com/document/d/1DX0gLkpk_NCE5MwI1_m4gnA2rLdjDkynZ02u2VWDR-8/edit ``` IMO shuffle write metrics is hard, as an RDD can have shuffle dependencies with multiple upstream RDDs. That said, in general the shuffle write metrics should belong to the upstream RDDs. ``` That's right and that's also what I try to do at first, logically upstream operator trigger shuffle write, and first attempt implementation is also changed SparkPlan base class to achieve this. ``` In Spark SQL, it's a little simpler, as the ShuffledRowRDD always have only one child, so it's reasonable to say that shuffle write metrics belong to ShuffledRowRDD. ``` Yes, maybe this also the suggestion by Reynold, ShuffleExchangeExec has only one child, we can do a simplify on the implementation. But as the shuffle write metrics are updated by task inner, so the core module still need some changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 cc @cloud-fan @gatorsmile @rxin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 @SparkQA test this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 test this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 @SparkQA retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 @AmplabJenkins retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 @AmplabJenkins test this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 @AmplabJenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23207 test this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/23207 [SPARK-26193][SQL] Implement shuffle write metrics in SQL ## What changes were proposed in this pull request? 1. Implement `SQLShuffleWriteMetricsReporter` on the SQL side as the customized `ShuffleWriteMetricsReporter`. 2. Add shuffle write metrics to `ShuffleExchangeExec`, and use these metrics to create corresponding `SQLShuffleWriteMetricsReporter` in shuffle dependency. 3. Expand current `ShuffleWriteMetrics` in context as a proxy, register the shuffle write metrics reporter to it during ShuffleMapTask is created on executor. ## How was this patch tested? Add UT in SQLMetricsSuite. Manually test locally, update screen shot to document attached in JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-26193 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23207 commit 6b26c629439973045da77f7bcd4b852afe8ebd8b Author: Yuanjian Li Date: 2018-12-02T12:19:55Z Commit for fist time success commit a8a1225837419c99a3d9941046a2ca6b501f6dc8 Author: Yuanjian Li Date: 2018-12-03T12:06:34Z Simplify implement by add metrics in ShuffleExchangeExec commit 7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb Author: Yuanjian Li Date: 2018-12-03T12:40:41Z code clean and comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23175: [SPARK-26142]followup: Move sql shuffle read metrics rel...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23175 Thanks @cloud-fan @rxin. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r237346452 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.metric + +import org.apache.spark.executor.TempShuffleReadMetrics + +/** + * A shuffle metrics reporter for SQL exchange operators. + * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. + * @param metrics All metrics in current SparkPlan. This param should not empty and + * contains all shuffle metrics defined in [[SQLMetrics.getShuffleReadMetrics]]. + */ +private[spark] class SQLShuffleMetricsReporter( + tempMetrics: TempShuffleReadMetrics, --- End diff -- Thanks ,done in https://github.com/apache/spark/pull/23175. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r237346431 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -194,4 +202,16 @@ object SQLMetrics { SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value))) } } + + /** + * Create all shuffle read relative metrics and return the Map. + */ + def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map( --- End diff -- Thanks, rename it to createShuffleReadMetrics and move to SQLShuffleMetricsReporter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23175: [SPARK-26142]followup: Move sql shuffle read metr...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/23175 [SPARK-26142]followup: Move sql shuffle read metrics relatives to SQLShuffleMetricsReporter ## What changes were proposed in this pull request? Follow up for https://github.com/apache/spark/pull/23128, move sql read metrics relatives to `SQLShuffleMetricsReporter`, in order to put sql shuffle read metrics relatives closer and avoid possible problem about forgetting update SQLShuffleMetricsReporter while new metrics added by others. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-26142-follow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23175.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23175 commit c042d15920f4ebd7166caafcb3696683978d42de Author: Yuanjian Li Date: 2018-11-29T02:47:33Z Move all sql shuffle read metrics stuff to SQLShuffleMetricsReporter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23128 @rxin Thanks for guidance, I'll address these comments in a follow up PR soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23128 Thanks @cloud-fan @gatorsmile @rxin ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236982643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,10 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// Wrap the tempMetrics with SQLShuffleMetricsReporter here to support +// shuffle read metrics in SQL. --- End diff -- Thanks Wenchen, done in 8e84c5b. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23128 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23128 python UT failed cause jvm crush. retest this pleas. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236926403 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,10 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, +// so we just use the tempMetrics created in TaskContext in this case. --- End diff -- Removing this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23153: [SPARK-26147][SQL] only pull out unevaluable python udf ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23153 Thanks for the fix from Wenchen, ``` the suites should also construct the dummy python udf from both side. ``` I fix the suite locally, they can be simply modified like: ``` diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala index d3867f2b6b..a0f8ae2fc7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala @@ -40,13 +40,18 @@ class PullOutPythonUDFInJoinConditionSuite extends PlanTest { CheckCartesianProducts) :: Nil } - val testRelationLeft = LocalRelation('a.int, 'b.int) - val testRelationRight = LocalRelation('c.int, 'd.int) + val attrA = 'a.int + val attrB = 'b.int + val attrC = 'c.int + val attrD = 'd.int + + val testRelationLeft = LocalRelation(attrA, attrB) + val testRelationRight = LocalRelation(attrC, attrD) // Dummy python UDF for testing. Unable to execute. val pythonUDF = PythonUDF("pythonUDF", null, BooleanType, -Seq.empty, +Seq(attrA, attrC), PythonEvalType.SQL_BATCHED_UDF, udfDeterministic = true) @@ -118,7 +123,7 @@ class PullOutPythonUDFInJoinConditionSuite extends PlanTest { test("pull out whole complex condition with multiple python udf") { val pythonUDF1 = PythonUDF("pythonUDF1", null, BooleanType, - Seq.empty, + Seq(attrA, attrC), PythonEvalType.SQL_BATCHED_UDF, udfDeterministic = true) val condition = (pythonUDF || 'a.attr === 'c.attr) && pythonUDF1 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23153#discussion_r236743539 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -155,19 +155,20 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } /** - * PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF - * and pull them out from join condition. For python udf accessing attributes from only one side, - * they are pushed down by operation push down rules. If not (e.g. user disables filter push - * down rules), we need to pull them out in this rule too. + * PythonUDF in join condition can't be evaluated if it refers to attributes from both join sides. + * See `ExtractPythonUDFs` for details. This rule will detect un-evaluable PythonUDF and pull them + * out from join condition. */ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { - def hasPythonUDF(expression: Expression): Boolean = { -expression.collectFirst { case udf: PythonUDF => udf }.isDefined + + private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = { +expr.find { e => + PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && !canEvaluate(e, j.right) +}.isDefined } override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case j @ Join(_, _, joinType, condition) -if condition.isDefined && hasPythonUDF(condition.get) => +case j @ Join(_, _, joinType, Some(cond)) if hasUnevaluablePythonUDF(cond, j) => --- End diff -- Followed by the rule changes, we need modify the suites in `PullOutPythonUDFInJoinConditionSuite`, the suites should also construct the dummy python udf from both side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23153#discussion_r236647128 --- Diff: python/pyspark/sql/tests/test_udf.py --- @@ -209,6 +209,18 @@ def test_udf_in_join_condition(self): with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, b=1)]) +def test_udf_in_left_outer_join_condition(self): +# regression test for SPARK-26147 +from pyspark.sql.functions import udf, col +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a: str(a), StringType()) +# The join condition can't be pushed down, as it refers to attributes from both sides. +# The Python UDF only refer to attributes from one side, so it's evaluable. +df = left.join(right, f("a") == col("b").cast("string"), how = "left_outer") --- End diff -- style nit: how="left_outer" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236720141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,14 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, --- End diff -- Done in 0348ae5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236646423 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,14 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, --- End diff -- ``` do you mean we may leave the metrics empty when creating ShuffledRowRDD in tests? ``` Yes, like we did in `UnsafeRowSerializerSuite`. ``` I don't think we need to consider this case since ShuffledRowRDD is a private API ``` Got it, after search `new ShuffledRowRDD` in all source code, `UnsafeRowSerializerSuite` is the only place, I'll change the test and delete the default value of `metrics` in this commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236251210 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -82,6 +82,14 @@ object SQLMetrics { private val baseForAvgMetric: Int = 10 + val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" + val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" + val REMOTE_BYTES_READ = "remoteBytesRead" + val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk" + val LOCAL_BYTES_READ = "localBytesRead" + val FETCH_WAIT_TIME = "fetchWaitTime" + val RECORDS_READ = "recordsRead" --- End diff -- Thanks for your advise Wenchen, I tried `sync this list with ShuffleReadMetrics` locally and left these comments below: 1. It's easy to sync SQLMetrics with `ShuffleReadMetrics` while the task has only one shuffle reader, just call `ShuffleMetricsReporter.incXXX` to achieve this. 2. But for multi shuffle reader in single task, we need add `setXXX` functions in `ShuffleMetricsReporter` trait, cause we need reset the SQLMetrics after `setMergeValues` called by every shuffle reader. 3. I also tried to achieve this but not change the `ShuffleMetricsReporter`, like call the `ShuffleMetricsReporter.incXXX` at driver side when taskEnd, but maybe this is not a good way. If you think it's make sense to change the `ShuffleMetricsReporter` trait, I'll give a commit soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23105: [SPARK-26140] Enable custom metrics implementatio...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23105#discussion_r236036258 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +/** + * An interface for reporting shuffle read metrics, for each shuffle. This interface assumes + * all the methods are called on a single-threaded, i.e. concrete implementations would not need + * to synchronize. + * + * All methods have additional Spark visibility modifier to allow public, concrete implementations + * that still have these methods marked as private[spark]. + */ +private[spark] trait ShuffleReadMetricsReporter { --- End diff -- https://github.com/apache/spark/pull/23128 :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Support passing shuffle metrics to ex...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23128 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Support passing shuffle metrics to ex...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/23128 @gatorsmile Thanks Xiao! Conflicts resolve done, as Reynold comments in https://github.com/apache/spark/pull/23105#discussion_r235950427, when the ShuffleMetricsReporter move to ShuffleReadMetricsReporter in write pr, it will conflict again here, I'll keep tracking the relevant pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236032855 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.metric + +import org.apache.spark.executor.TempShuffleReadMetrics + +/** + * A shuffle metrics reporter for SQL exchange operators. + * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. + * @param metrics All metrics in current SparkPlan. + */ +class SQLShuffleMetricsReporter( + tempMetrics: TempShuffleReadMetrics, + metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { + + override def incRemoteBlocksFetched(v: Long): Unit = { +metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED).add(v) --- End diff -- Sorry for the less consideration on per-row operation here, I should be more careful. Fix done in cb46bfe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26139][SQL] Support passing shuffle metric...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/23128 [SPARK-26139][SQL] Support passing shuffle metrics to exchange operator ## What changes were proposed in this pull request? Implement `SQLShuffleMetricsReporter` on the sql side as the customized ShuffleMetricsReporter, which extended the `TempShuffleReadMetrics` and update SQLMetrics, in this way shuffle metrics can be reported in the SQL UI. ## How was this patch tested? Add UT in SQLMetricsSuite. Manual test locally, before: ![image](https://user-images.githubusercontent.com/4833765/48960517-30f97880-efa8-11e8-982c-92d05938fd1d.png) after: ![image](https://user-images.githubusercontent.com/4833765/48960587-b54bfb80-efa8-11e8-8e95-7a3c8c74cc5c.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-26142 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23128.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23128 commit fba590f040c8fd7ce75df3f733f246db18e79ee6 Author: Reynold Xin Date: 2018-11-21T14:56:23Z [SPARK-26140] Pull TempShuffleReadMetrics creation out of shuffle reader commit 35b48b21028110742aed7f7f5b5d62109c2f0adf Author: Reynold Xin Date: 2018-11-21T15:02:04Z less movement of code commit 1b556ecf869685af8f34d448ac3f08102a758124 Author: liyuanjian Date: 2018-11-23T21:02:25Z [SPARK-26142] Implement shuffle read metric in SQL --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21363: [SPARK-19228][SQL] Migrate on Java 8 time from FastDateF...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21363 @MaxGekk Sorry for the late, something inserted in the my scheduler, I plan to start this PR in this weekend, if its too late please just take it, sorry for the late again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22989: [SPARK-25986][Build] Add rules to ban throw Errors in ap...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22989 Thanks @HyukjinKwon @viirya @felixcheung @srowen for your review and advise! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r233432630 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala --- @@ -331,7 +333,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { } } -object KMeansSuite extends SparkFunSuite { +object KMeansSuite extends SparkFunSuite with Assertions { --- End diff -- ah thanks! Done in 210d942. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r233432568 --- Diff: dev/checkstyle.xml --- @@ -180,5 +180,10 @@ + + + --- End diff -- `application code` against JVM here, as Error subclasses generally represent internal JVM errors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix barrier task run without Barr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22962 @HyukjinKwon No problem, I'll give a follow up PR to address all your comments and rewrite the UT in to a separate class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233283645 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- ``` could you add some comments to explain it? ``` @cloud-fan Sorry for the less explain and more comments should be done at first, will done in follow up PR. ``` Can we get rid of the rewrite all? we should remove __init__ too next time please fully describe what's going on in PR description ``` @HyukjinKwon Sorry for the less explain, all these will be done in next follow up PR, and the new UT. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233275410 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- ``` could you add some comments to explain it? ``` @cloud-fan Sorry for the less explain and more comments should be done at first, will done in follow up PR. ``` Can we get rid of the rewrite all? we should remove __init__ too next time please fully describe what's going on in PR description ``` @HyukjinKwon Sorry for the less explain, all these will be done in next follow up PR, and the new UT. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233055939 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- I think the answer is no, maybe I was not clear enough in my previous explain https://github.com/apache/spark/pull/22962#discussion_r232528333, use `BarrierTaskContext()` here is my first commit https://github.com/apache/spark/pull/22962/commits/0cb2cf6e9ece66861073c31b579b595a9de5ce81 , it should also need to rewrite `__new__` for `BarrierTaskContext`, otherwise the bug still exists cause its parent class `TaskContext` rewrite `__new__()`, when we call `BarrierTaskContext()` here in a reused worker, a `TaskContext` instance will be returned in `TaskContext.__new__()`:https://github.com/apache/spark/blob/c00e72f3d7530eb2ae43d4d45e8efde783daf6ff/python/pyspark/taskcontext.py#L47 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232986340 --- Diff: python/pyspark/tests.py --- @@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self): """ Verify that BarrierTaskContext.barrier() with reused python worker. """ +self.sc._conf.set("spark.python.work.reuse", "true") --- End diff -- @HyukjinKwon Hi Hyukjin if you still think this need a separate class I'll think about the method of checking worker reuse and give a follow up PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix barrier task run without Barr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22962 Thanks @gatorsmile @HyukjinKwon @cloud-fan ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r232984147 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala --- @@ -283,7 +283,9 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { points.zip(rows.map(_(0))).foreach { case (orig: SparseVector, indexed: SparseVector) => assert(orig.indices.length == indexed.indices.length) - case _ => throw new UnknownError("Unit test has a bug in it.") // should never happen + case _ => +// should never happen +throw new IllegalAccessException("Unit test has a bug in it.") --- End diff -- Thanks, done in a4f49ce by just `fail` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r232983941 --- Diff: dev/checkstyle-suppressions.xml --- @@ -46,4 +46,12 @@ files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/> +
[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r232955017 --- Diff: dev/checkstyle-suppressions.xml --- @@ -46,4 +46,12 @@ files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/> +
[GitHub] spark issue #22989: [SPARK-25986][Build] Add rules to ban throw Errors in ap...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22989 @srowen Great thanks for your guidance, address all your suggestion in ff234d3 and update the record table in https://github.com/apache/spark/pull/22989#issuecomment-437939830. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r232722383 --- Diff: dev/checkstyle.xml --- @@ -64,6 +64,11 @@ + + --- End diff -- Thanks, done in 6e49bb8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r232722184 --- Diff: scalastyle-config.xml --- @@ -240,6 +240,18 @@ This file is divided into 3 sections: ]]> + --- End diff -- Thanks, new rule named `throwerror`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r232721829 --- Diff: dev/checkstyle.xml --- @@ -64,6 +64,11 @@ + + + --- End diff -- Thanks, rewrite this rule in 6e49bb8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r232721412 --- Diff: scalastyle-config.xml --- @@ -240,6 +240,18 @@ This file is divided into 3 sections: ]]> + +throw new OutOfMemoryError +
[GitHub] spark issue #22989: [SPARK-25986][Build] Banning throw new OutOfMemoryErrors
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22989 cc all reviewer, as @srowen's suggestion, add a rule to ban all of new Error cases. List currently `throw new XXXError` in Spark source below and record fix up or exclude for review conveniently: ErrorName | Count | In Test/Source Code | Fix Up or Exclude | - | - | - AssertionError | 32 | 29 in test code | Exclude | | | 2 in UnsafeAlignedOffset | Fix up by changing them to `IllegalArgumentException` | | | 1 in KafkaUtils | Just exclude cause fix it will cause behavior change NotImplementedError | 22 | 7 in source code/ 15 in test cases | Fix up by changing them to `UnsupportedOperationException` OutOfMemoryError | 1 | 1 in test code | Exclude LinkageError | 1 | 1 in test code | Exclude SparkOutOfMemoryError | 5 | 5 in source code | Exclude UnknownError | 9 | 5 in source code/ 4 in test cases | Fix up by changing to `IllegalAccessException ` and `IllegalArgumentException` All above done in 6e49bb8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFInJoinCo...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22955 Thanks @mgaido91 @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232634698 --- Diff: python/pyspark/tests.py --- @@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self): """ Verify that BarrierTaskContext.barrier() with reused python worker. """ +self.sc._conf.set("spark.python.work.reuse", "true") --- End diff -- I do these 2 check like below: 1. Run this test case without fix in `BarrierTaskContext._getOrCreate`, the bug can be reproduced. 2. Same code running in pyspark shell and set `spark.python.work.resue=false`, it return successfully. Maybe this can prove the UT can cover the issue and also can reuse the original barrier case code, WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22989: [SPARK-25986][Build] Banning throw new OutOfMemoryErrors
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22989 Sorry for late reply, great thanks for all reviewer's advise, will address them soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix barrier task run without Barr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22962 @HyukjinKwon Thanks for your review, comment address and PR description/title changed done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232528333 --- Diff: python/pyspark/taskcontext.py --- @@ -144,10 +144,19 @@ def __init__(self): """Construct a BarrierTaskContext, use get instead""" pass +def __new__(cls): --- End diff -- Yep, do this in `_getOrCreate` has same effect, this is an over consider of https://github.com/apache/spark/blob/aec0af4a952df2957e21d39d1e0546a36ab7ab86/python/pyspark/taskcontext.py#L44-L45 Deleted in 02555b8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232527808 --- Diff: python/pyspark/tests.py --- @@ -614,6 +614,18 @@ def context_barrier(x): times = rdd.barrier().mapPartitions(f).map(context_barrier).collect() self.assertTrue(max(times) - min(times) < 1) +def test_barrier_with_python_worker_reuse(self): +""" +Verify that BarrierTaskContext.barrier() with reused python worker. +""" +rdd = self.sc.parallelize(range(4), 4) --- End diff -- Thanks, done in 02555b8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22955#discussion_r232489060 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers._ + +import org.apache.spark.api.python.PythonEvalType +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.PythonUDF +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.internal.SQLConf._ +import org.apache.spark.sql.types.BooleanType + +class PullOutPythonUDFInJoinConditionSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Extract PythonUDF From JoinCondition", Once, +PullOutPythonUDFInJoinCondition) :: + Batch("Check Cartesian Products", Once, +CheckCartesianProducts) :: Nil + } + + val testRelationLeft = LocalRelation('a.int, 'b.int) + val testRelationRight = LocalRelation('c.int, 'd.int) + + // Dummy python UDF for testing. Unable to execute. + val pythonUDF = PythonUDF("pythonUDF", null, +BooleanType, +Seq.empty, +PythonEvalType.SQL_BATCHED_UDF, +udfDeterministic = true) + + val unsupportedJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti) + + private def comparePlansWithConf(query: LogicalPlan, expected: LogicalPlan): Unit = { +// AnalysisException thrown by CheckCartesianProducts while spark.sql.crossJoin.enabled=false +val exception = intercept[AnalysisException] { + Optimize.execute(query.analyze) +} +assert(exception.message.startsWith("Detected implicit cartesian product")) + +// pull out the python udf while set spark.sql.crossJoin.enabled=true +withSQLConf(CROSS_JOINS_ENABLED.key -> "true") { + val optimized = Optimize.execute(query.analyze) + comparePlans(optimized, expected) +} + } + + test("inner join condition with python udf only") { --- End diff -- I'm sorry for lacking of comments to your previous comment `they differ only by the join type...`, they differ not only the type, but also the expected plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22955#discussion_r232488956 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala --- @@ -50,20 +50,11 @@ class PullOutPythonUDFInJoinConditionSuite extends PlanTest { PythonEvalType.SQL_BATCHED_UDF, udfDeterministic = true) - val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti) - - test("inner join condition with python udf only") { -val query = testRelationLeft.join( - testRelationRight, - joinType = Inner, - condition = Some(pythonUDF)) -val expected = testRelationLeft.join( - testRelationRight, - joinType = Inner, - condition = None).where(pythonUDF).analyze + val unsupportedJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti) + private def comparePlansWithConf(query: LogicalPlan, expected: LogicalPlan): Unit = { --- End diff -- how about `comparePlanWithCrossJoinEnable`? Just afraid it's too long at first, any advise :) Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21363: [SPARK-19228][SQL] Migrate on Java 8 time from FastDateF...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21363 @HyukjinKwon Great thanks for ping me, I'll try to work on this and cc all reviewer in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22989: [SPARK-25986][Build] Banning throw new OutOfMemoryErrors
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22989 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22955#discussion_r232163956 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers._ + +import org.apache.spark.api.python.PythonEvalType +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.PythonUDF +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.internal.SQLConf._ +import org.apache.spark.sql.types.BooleanType + +class PullOutPythonUDFInJoinConditionSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Extract PythonUDF From JoinCondition", Once, +PullOutPythonUDFInJoinCondition) :: + Batch("Check Cartesian Products", Once, +CheckCartesianProducts) :: Nil + } + + val testRelationLeft = LocalRelation('a.int, 'b.int) + val testRelationRight = LocalRelation('c.int, 'd.int) + + // Dummy python UDF for testing. Unable to execute. + val pythonUDF = PythonUDF("pythonUDF", null, +BooleanType, +Seq.empty, +PythonEvalType.SQL_BATCHED_UDF, +udfDeterministic = true) + + val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti) + + test("inner join condition with python udf only") { +val query = testRelationLeft.join( + testRelationRight, + joinType = Inner, + condition = Some(pythonUDF)) +val expected = testRelationLeft.join( + testRelationRight, + joinType = Inner, + condition = None).where(pythonUDF).analyze + +// AnalysisException thrown by CheckCartesianProducts while spark.sql.crossJoin.enabled=false +val exception = the [AnalysisException] thrownBy { + Optimize.execute(query.analyze) +} +assert(exception.message.startsWith("Detected implicit cartesian product")) + +// pull out the python udf while set spark.sql.crossJoin.enabled=true +withSQLConf(CROSS_JOINS_ENABLED.key -> "true") { + val optimized = Optimize.execute(query.analyze) + comparePlans(optimized, expected) +} + } + + test("left semi join condition with python udf only") { +val query = testRelationLeft.join( + testRelationRight, + joinType = LeftSemi, + condition = Some(pythonUDF)) +val expected = testRelationLeft.join( + testRelationRight, + joinType = Inner, + condition = None).where(pythonUDF).select('a, 'b).analyze + +// AnalysisException thrown by CheckCartesianProducts while spark.sql.crossJoin.enabled=false +val exception = the [AnalysisException] thrownBy { + Optimize.execute(query.analyze) +} +assert(exception.message.startsWith("Detected implicit cartesian product")) + +// pull out the python udf while set spark.sql.crossJoin.enabled=true +withSQLConf(CROSS_JOINS_ENABLED.key -> "true") { + val optimized = Optimize.execute(query.analyze) + comparePlans(optimized, expected) +} + } + + test("python udf with other common condition") { --- End diff -- Thanks, add more cases in 38b1555. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22955#discussion_r232163715 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers._ + +import org.apache.spark.api.python.PythonEvalType +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.PythonUDF +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.internal.SQLConf._ +import org.apache.spark.sql.types.BooleanType + +class PullOutPythonUDFInJoinConditionSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Extract PythonUDF From JoinCondition", Once, +PullOutPythonUDFInJoinCondition) :: + Batch("Check Cartesian Products", Once, +CheckCartesianProducts) :: Nil + } + + val testRelationLeft = LocalRelation('a.int, 'b.int) + val testRelationRight = LocalRelation('c.int, 'd.int) + + // Dummy python UDF for testing. Unable to execute. + val pythonUDF = PythonUDF("pythonUDF", null, +BooleanType, +Seq.empty, +PythonEvalType.SQL_BATCHED_UDF, +udfDeterministic = true) + + val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti) --- End diff -- Thanks, done in 38b1555. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22955#discussion_r232163787 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers._ + +import org.apache.spark.api.python.PythonEvalType +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.PythonUDF +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.internal.SQLConf._ +import org.apache.spark.sql.types.BooleanType + +class PullOutPythonUDFInJoinConditionSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Extract PythonUDF From JoinCondition", Once, +PullOutPythonUDFInJoinCondition) :: + Batch("Check Cartesian Products", Once, +CheckCartesianProducts) :: Nil + } + + val testRelationLeft = LocalRelation('a.int, 'b.int) + val testRelationRight = LocalRelation('c.int, 'd.int) + + // Dummy python UDF for testing. Unable to execute. + val pythonUDF = PythonUDF("pythonUDF", null, +BooleanType, +Seq.empty, +PythonEvalType.SQL_BATCHED_UDF, +udfDeterministic = true) + + val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti) + + test("inner join condition with python udf only") { --- End diff -- Sorry for this, done in 38b1555. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22955#discussion_r232163738 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers._ + +import org.apache.spark.api.python.PythonEvalType +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.PythonUDF +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.internal.SQLConf._ +import org.apache.spark.sql.types.BooleanType + +class PullOutPythonUDFInJoinConditionSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Extract PythonUDF From JoinCondition", Once, +PullOutPythonUDFInJoinCondition) :: + Batch("Check Cartesian Products", Once, +CheckCartesianProducts) :: Nil + } + + val testRelationLeft = LocalRelation('a.int, 'b.int) + val testRelationRight = LocalRelation('c.int, 'd.int) + + // Dummy python UDF for testing. Unable to execute. + val pythonUDF = PythonUDF("pythonUDF", null, +BooleanType, +Seq.empty, +PythonEvalType.SQL_BATCHED_UDF, +udfDeterministic = true) + + val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti) + + test("inner join condition with python udf only") { +val query = testRelationLeft.join( + testRelationRight, + joinType = Inner, + condition = Some(pythonUDF)) +val expected = testRelationLeft.join( + testRelationRight, + joinType = Inner, + condition = None).where(pythonUDF).analyze + +// AnalysisException thrown by CheckCartesianProducts while spark.sql.crossJoin.enabled=false +val exception = the [AnalysisException] thrownBy { --- End diff -- Thanks, done in 38b1555. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22989 [SPARK-25986][Build] Banning throw new OutOfMemoryErrors ## What changes were proposed in this pull request? Add scala and java lint check rules to ban the usage of `throw new OutOfMemoryErrors` cause it will cause hole executor killed. See more details in https://github.com/apache/spark/pull/22969. ## How was this patch tested? Local test with lint-scala and lint-java. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25986 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22989.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22989 commit d67875115f622082519b1dbcb1c1e34c2184b34f Author: Yuanjian Li Date: 2018-11-09T05:23:01Z banning throw new OutOfMemoryErrors --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFInJoinCo...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22955 Thanks for the reply, unnecessary end-to-end tests removed in https://github.com/apache/spark/pull/22326/commits/2b6977de4a3b3489b9c2172a6a8a39831bf1d048, others maybe should be kept? Cause mock python udf in scala side can't 100% same with python side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFInJoinCo...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22955 cc @cloud-fan @mgaido91 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext while pyth...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22962 cc @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext whi...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22962 [SPARK-25921][PySpark] Fix BarrierTaskContext while python worker reuse ## What changes were proposed in this pull request? While python worker reuse, BarrierTaskContext._getOrCreate() will still return a TaskContext, we'll get a `AttributeError: 'TaskContext' object has no attribute 'barrier'`. Fix this by adding check logic in BarrierTaskContext._getOrCreate() and rewrite `__new__` method for BarrierTaskContext. ## How was this patch tested? Add new UT in pyspark-core. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25921 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22962.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22962 commit 0cb2cf6e9ece66861073c31b579b595a9de5ce81 Author: Yuanjian Li Date: 2018-11-07T10:01:54Z fix BarrierTaskContext while python worker reuse --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for ContextBarrierSta...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 gental ping @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for ContextBarrierSta...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22955 [SPARK-25949][SQL] Add test for PullOutPythonUDFInJoinCondition ## What changes were proposed in this pull request? As comment in https://github.com/apache/spark/pull/22326#issuecomment-424923967, we test the new added optimizer rule by end-to-end test in python side, need to add suites under `org.apache.spark.sql.catalyst.optimizer` like other optimizer rules. ## How was this patch tested? new added UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25949 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22955.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22955 commit 344b516a414107cf5494951ae74886b6f62f3e5a Author: Yuanjian Li Date: 2018-11-06T12:29:26Z Add test for PullOutPythonUDFInJoinCondition --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org