[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r240092271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { --- End diff -- Thanks! Could you submit the PR to fix `WindowExec` please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23245: [SPARK-26060][SQL][FOLLOW-UP] Rename the config name.
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23245 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239666242 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { --- End diff -- oh, we needed to fix the original `WindowExec` as well, didn't we. How about fixing it in a separate PR first to see feedbacks from other reviewers who are familiar with window functions, and following the PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23245: [SPARK-26060][SQL][FOLLOW-UP] Rename the config n...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/23245 [SPARK-26060][SQL][FOLLOW-UP] Rename the config name. ## What changes were proposed in this pull request? This is a follow-up of #23031 to rename the config name. The `execution` category might not be needed. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-26060/rename_config Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23245.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23245 commit 021134cd2b6a0a82ef8ef36a5ce122bff397ab32 Author: Takuya UESHIN Date: 2018-12-06T09:24:30Z Rename the config name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23245: [SPARK-26060][SQL][FOLLOW-UP] Rename the config name.
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23245 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239312302 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { --- End diff -- I guess we have to use `GenerateOrdering.compare()` to support complex types as well as `GroupedIterator` does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239307965 --- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py --- @@ -231,12 +266,10 @@ def test_array_type(self): self.assertEquals(result1.first()['v2'], [1.0, 2.0]) def test_invalid_args(self): -from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.functions import mean, pandas_udf, PandasUDFType --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239307779 --- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py --- @@ -87,8 +96,34 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1) + +@property +def sliding_range_window(self): +return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4) + +@property +def growing_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3) + +@property +def growing_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(Window.unboundedPreceding, 4) + +@property +def shrinking_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, Window.unboundedFollowing) + +@property +def shrinking_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(-3, Window.unboundedFollowing) + def test_simple(self): -from pyspark.sql.functions import mean +from pyspark.sql.functions import pandas_udf, PandasUDFType, percent_rank, mean, max --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239308506 --- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py --- @@ -245,11 +278,101 @@ def test_invalid_args(self): foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP) df.withColumn('v2', foo_udf(df['v']).over(w)) -with QuietTest(self.sc): -with self.assertRaisesRegexp( -AnalysisException, -'.*Only unbounded window frame is supported.*'): -df.withColumn('mean_v', mean_udf(df['v']).over(ow)) +def test_bounded_simple(self): +from pyspark.sql.functions import mean, max, min, count + +df = self.data +w1 = self.sliding_row_window +w2 = self.shrinking_range_window + +plus_one = self.python_plus_one +count_udf = self.pandas_agg_count_udf +mean_udf = self.pandas_agg_mean_udf +max_udf = self.pandas_agg_max_udf +min_udf = self.pandas_agg_min_udf + +result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1)) \ +.withColumn('count_v', count_udf(df['v']).over(w2)) \ +.withColumn('max_v', max_udf(df['v']).over(w2)) \ +.withColumn('min_v', min_udf(df['v']).over(w1)) + +expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1)) \ +.withColumn('count_v', count(df['v']).over(w2)) \ +.withColumn('max_v', max(df['v']).over(w2)) \ +.withColumn('min_v', min(df['v']).over(w1)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_growing_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.growing_row_window +w2 = self.growing_range_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_sliding_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.sliding_row_window +w2 = self.sliding_range_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_shrinking_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.shrinking_row_window +w2 = self.shrinking_range_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_bounded_mixed(self): +from pyspark.sql.functions import mean, max, min, count --- End diff -- We don't need min and count? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r239307483 --- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py --- @@ -44,9 +44,18 @@ def python_plus_one(self): @property def pandas_scalar_time_two(self): -from pyspark.sql.functions import pandas_udf +from pyspark.sql.functions import pandas_udf, PandasUDFType --- End diff -- nit: we can revert this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238146834 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -64,7 +85,7 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable i = 0 while (i < validExprs.length) { val (_, ordinal) = validExprs(i) - mutableRow(ordinal) = buffer(ordinal) + fieldWriters(i)(buffer(ordinal)) --- End diff -- Since `fieldWriters` is accessed via index, we should use `IndexedSeq` or `Array` explicitly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237826533 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala --- @@ -293,6 +293,54 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("INSET: binary") { --- End diff -- Submitted #23187. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23187: [SPARK-26211][SQL][FOLLOW-UP] Combine test cases ...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/23187 [SPARK-26211][SQL][FOLLOW-UP] Combine test cases for `In` and `InSet`. ## What changes were proposed in this pull request? This is a follow pr of #23176. `In` and `InSet` are semantically equal, so the tests for `In` should pass with `InSet`, and vice versa. This combines those test cases. ## How was this patch tested? The combined tests and existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-26211/in_inset_tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23187.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23187 commit a87cb7739a0d9e864d8a03376962aa06a99cc100 Author: Takuya UESHIN Date: 2018-11-30T11:00:02Z Combine test cases for `In` and `InSet`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23187: [SPARK-26211][SQL][FOLLOW-UP] Combine test cases for `In...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23187 cc @gatorsmile @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237771176 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala --- @@ -293,6 +293,54 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("INSET: binary") { --- End diff -- Sure, I'll do it later. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23031 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23031 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23031 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237400321 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with } @transient lazy val set: Set[Any] = child.dataType match { -case _: AtomicType => hset +case t: AtomicType if !t.isInstanceOf[BinaryType] => hset case _: NullType => hset case _ => + val ord = TypeUtils.getInterpretedOrdering(child.dataType) + val ordering = if (hasNull) { +new Ordering[Any] { + override def compare(x: Any, y: Any): Int = { +if (x == null && y == null) { + 0 +} else if (x == null) { + -1 +} else if (y == null) { + 1 +} else { + ord.compare(x, y) +} + } +} + } else { +ord + } // for structs use interpreted ordering to be able to compare UnsafeRows with non-UnsafeRows - TreeSet.empty(TypeUtils.getInterpretedOrdering(child.dataType)) ++ hset + TreeSet.empty(ordering) ++ hset --- End diff -- Actually we are using `nullSafeEval`, so we don't need to update it. Instead, I'm updating to use `nullSafeCodeGen` for codegen path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237399670 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with } @transient lazy val set: Set[Any] = child.dataType match { -case _: AtomicType => hset +case t: AtomicType if !t.isInstanceOf[BinaryType] => hset case _: NullType => hset case _ => + val ord = TypeUtils.getInterpretedOrdering(child.dataType) + val ordering = if (hasNull) { +new Ordering[Any] { + override def compare(x: Any, y: Any): Int = { --- End diff -- Thanks! yeah, I'm updating as @cloud-fan's idea. Also we can use `nullSafeCodeGen` for codegen path, I'll update it as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23176: [SPARK-26211][SQL] Fix InSet for binary, and struct and ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23176 cc @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/23176 [SPARK-26211][SQL] Fix InSet for binary, and struct and array with null. ## What changes were proposed in this pull request? Currently `InSet` doesn't work properly for binary type, or struct and array type with null value in the set. Because, as for binary type, the `HashSet` doesn't work properly for `Array[Byte]`, and as for struct and array type with null value in the set, the `ordering` will throw a `NPE`. ## How was this patch tested? Added a few tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-26211/inset Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23176.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23176 commit 277c48f63b9d36028b5dfbb3c850418713197cc4 Author: Takuya UESHIN Date: 2018-11-29T06:47:29Z Fix InSet for binary, and struct and array with null. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23031 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23031: [SPARK-26060][SQL] Track SparkConf entries and ma...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23031#discussion_r237071928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala --- @@ -154,5 +154,9 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { if (SQLConf.staticConfKeys.contains(key)) { throw new AnalysisException(s"Cannot modify the value of a static config: $key") } +if (sqlConf.setCommandRejectsSparkConfs && +ConfigEntry.findEntry(key) != null && !SQLConf.sqlConfEntries.containsKey(key)) { --- End diff -- I'm sorry for the delay. As per the comment https://github.com/apache/spark/pull/22887#issuecomment-442425557, I'd leave it as is for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236973308 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala --- @@ -19,6 +19,12 @@ package org.apache.spark.sql.catalyst.util import java.util.{Map => JavaMap} +/** + * A simple `MapData` implementation which is backed by 2 arrays. + * + * Note that, user is responsible to guarantee that the key array does not have duplicated + * elements, otherwise the behavior is undefined. --- End diff -- nit: we might need to add the same note to the 3rd and 4th `ArrayBasedMapData.apply()` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236955791 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + assert(keyType != NullType, "map key cannot be null type.") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] --- End diff -- We need to exempt `BinaryType` from `AtomicType` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236958252 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeRow} +import org.apache.spark.sql.types.{ArrayType, IntegerType, StructType} +import org.apache.spark.unsafe.Platform + +class ArrayBasedMapBuilderSuite extends SparkFunSuite { + + test("basic") { +val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType) +builder.put(1, 1) +builder.put(InternalRow(2, 2)) +builder.putAll(new GenericArrayData(Seq(3)), new GenericArrayData(Seq(3))) +val map = builder.build() +assert(map.numElements() == 3) +assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 1, 2 -> 2, 3 -> 3)) + } + + test("fail with null key") { +val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType) +builder.put(1, null) // null value is OK +val e = intercept[RuntimeException](builder.put(null, 1)) +assert(e.getMessage.contains("Cannot use null as map key")) + } + + test("remove duplicated keys with last wins policy") { +val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType) +builder.put(1, 1) +builder.put(2, 2) +builder.put(1, 2) +val map = builder.build() +assert(map.numElements() == 2) +assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 2, 2 -> 2)) + } + + test("struct type key") { +val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType) +builder.put(InternalRow(1), 1) +builder.put(InternalRow(2), 2) +val unsafeRow = { + val row = new UnsafeRow(1) + val bytes = new Array[Byte](16) + row.pointTo(bytes, 16) + row.setInt(0, 1) + row +} +builder.put(unsafeRow, 3) +val map = builder.build() +assert(map.numElements() == 2) +assert(ArrayBasedMapData.toScalaMap(map) == Map(InternalRow(1) -> 3, InternalRow(2) -> 2)) + } + + test("array type key") { +val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType) +builder.put(new GenericArrayData(Seq(1, 1)), 1) +builder.put(new GenericArrayData(Seq(2, 2)), 2) +val unsafeArray = { + val array = new UnsafeArrayData() + val bytes = new Array[Byte](24) + Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2) + array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24) + array.setInt(0, 1) + array.setInt(1, 1) + array +} +builder.put(unsafeArray, 3) +val map = builder.build() +assert(map.numElements() == 2) +assert(ArrayBasedMapData.toScalaMap(map) == + Map(new GenericArrayData(Seq(1, 1)) -> 3, new GenericArrayData(Seq(2, 2)) -> 2)) + } --- End diff -- We should have a binary type key test as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23101 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23101 also cc @wangyum for the owner of [SPARK-25330](https://issues.apache.org/jira/browse/SPARK-25330). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23101 cc @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23101 I guess we need to update `dev/deps/spark-deps-hadoop-2.7` file as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23082: [SPARK-26112][SQL] Update since versions of new built-in...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23082 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23082: [SPARK-26112][SQL] Update since versions of new built-in...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23082 cc @cloud-fan @gatorsmile @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23082: [SPARK-26112][SQL] Update since versions of new b...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/23082 [SPARK-26112][SQL] Update since versions of new built-in functions. ## What changes were proposed in this pull request? The following 5 functions were removed from branch-2.4: - map_entries - map_filter - transform_values - transform_keys - map_zip_with We should update the since version to 3.0.0. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-26112/since Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23082.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23082 commit f26db66986a12049e14d1b234840b66f0b96767f Author: Takuya UESHIN Date: 2018-11-19T06:36:38Z Update since version to 3.0.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23045: [SPARK-26071][SQL] disallow map as map key
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23045 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23045: [SPARK-26071][SQL] disallow map as map key
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23045#discussion_r234502854 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -521,13 +521,18 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpression { override def checkInputDataTypes(): TypeCheckResult = { -var funcName = s"function $prettyName" +val funcName = s"function $prettyName" if (children.exists(!_.dataType.isInstanceOf[MapType])) { TypeCheckResult.TypeCheckFailure( s"input to $funcName should all be of type map, but it's " + children.map(_.dataType.catalogString).mkString("[", ", ", "]")) } else { - TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName) + val sameTypeCheck = TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName) + if (sameTypeCheck.isFailure) { +sameTypeCheck + } else { +TypeUtils.checkForMapKeyType(dataType.keyType) --- End diff -- oh, I see. thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23045: [SPARK-26071][SQL] disallow map as map key
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23045#discussion_r234481249 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -521,13 +521,18 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpression { override def checkInputDataTypes(): TypeCheckResult = { -var funcName = s"function $prettyName" +val funcName = s"function $prettyName" if (children.exists(!_.dataType.isInstanceOf[MapType])) { TypeCheckResult.TypeCheckFailure( s"input to $funcName should all be of type map, but it's " + children.map(_.dataType.catalogString).mkString("[", ", ", "]")) } else { - TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName) + val sameTypeCheck = TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName) + if (sameTypeCheck.isFailure) { +sameTypeCheck + } else { +TypeUtils.checkForMapKeyType(dataType.keyType) --- End diff -- I don't think we need this. The children already should not have map type keys? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.en...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/23046#discussion_r234073072 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -280,7 +280,7 @@ object ShuffleExchangeExec { } // The comparator for comparing row hashcode, which should always be Integer. val prefixComparator = PrefixComparators.LONG - val canUseRadixSort = SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED) + val canUseRadixSort = SQLConf.get.enableRadixSort --- End diff -- Ah, yes, to be exact, if users specified the config to `SparkConf` before Spark ran, it could be read. I'd leave which branch we should backport to to you and other reviewers. @jiangxb1987 @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.enableRad...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23046 cc @jiangxb1987 @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.en...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/23046 [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.enableRadixSort` instead of `SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`. ## What changes were proposed in this pull request? This is a follow-up of #20393. We should read the conf `"spark.sql.sort.enableRadixSort"` from `SQLConf` instead of `SparkConf`, i.e., use `SQLConf.get.enableRadixSort` instead of `SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`, otherwise the config is never read. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-23207/conf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23046.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23046 commit d8db5f3759b998e3ef5720f56c1a048c1494a1c1 Author: Takuya UESHIN Date: 2018-11-15T11:30:13Z Use `SQLConf.get.enableRadixSort` instead of `SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23031 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23031: [SPARK-26060][CORE][SQL] Track SparkConf entries and mak...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23031 @vanzin Thanks for letting me know it and that's really good to know. I'll update this to use it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23031: [SPARK-26060][CORE][SQL] Track SparkConf entries and mak...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/23031 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23031: [SPARK-26060][CORE][SQL] Track SparkConf entries ...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/23031 [SPARK-26060][CORE][SQL] Track SparkConf entries and make SET command reject such entries. ## What changes were proposed in this pull request? Currently the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. We should track `SparkConf` entries and make the command reject for such entries. ## How was this patch tested? Added a test and existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-26060/set_command Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23031.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23031 commit e8de7bb6847486ab9416bda8c97e8b0ce6c3bbe2 Author: Takuya UESHIN Date: 2018-11-13T09:59:31Z Register SparkConfig entries. commit 33d831e9f69b6a7f5bf965ba7357e73a70ac4239 Author: Takuya UESHIN Date: 2018-11-13T11:20:25Z Make SET command reject SparkConf entries. commit e19b1e430e967d0aeb5fe2a112e3a82ca18c5c80 Author: Takuya UESHIN Date: 2018-11-14T08:15:09Z Update the migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22200: [SPARK-25208][SQL] Loosen Cast.forceNullable for ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22200#discussion_r232608144 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -154,6 +154,15 @@ object Cast { fromPrecedence >= 0 && fromPrecedence < toPrecedence } + def canNullSafeCastToDecimal(from: DataType, to: DecimalType): Boolean = from match { +case from: BooleanType if to.isWiderThan(DecimalType.BooleanDecimal) => true +case from: NumericType if to.isWiderThan(from) => true +case from: DecimalType => + // truncating or precision lose + (to.precision - to.scale) > (from.precision - from.scale) --- End diff -- In this case, we need rounding, then we need an extra precision to avoid overflow. E.g., cast 99.95 of Decimal(4, 2) to Decimal(3, 1) will be 100.0, but it’s an overflow and ends up to null. We need Decimal(4, 1) to be null-safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22969: [SPARK-22827][SQL][FOLLOW-UP] Throw `SparkOutOfMe...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22969#discussion_r231783323 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -787,7 +789,7 @@ case class HashAggregateExec( |$unsafeRowKeys, ${hashEval.value}); | if ($unsafeRowBuffer == null) { |// failed to allocate the first page - |throw new OutOfMemoryError("No enough memory for aggregation"); + |throw new $oomeClassName("No enough memory for aggregation"); --- End diff -- Yes, I think so based on my investigation. I grep-ed with "OutOfMemoryError" and checked the suspicious places. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22969: [SPARK-22827][SQL][FOLLOW-UP] Throw `SparkOutOfMemoryErr...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22969 cc @sitalkedia @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22969: [SPARK-22827][SQL][FOLLOW-UP] Throw `SparkOutOfMe...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/22969 [SPARK-22827][SQL][FOLLOW-UP] Throw `SparkOutOfMemoryError` in `HashAggregateExec`, too. ## What changes were proposed in this pull request? This is a follow-up pr of #20014 which introduced `SparkOutOfMemoryError` to avoid killing the entire executor when an `OutOfMemoryError` is thrown. We should throw `SparkOutOfMemoryError` in `HashAggregateExec`, too. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-22827/oome Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22969.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22969 commit f07ab0938563fe63dd20fa756543b14478a27c2f Author: Takuya UESHIN Date: 2018-11-08T04:59:35Z Throw `SparkOutOfMemoryError` in `HashAggregateExec`, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r231408205 --- Diff: python/pyspark/sql/tests.py --- @@ -6323,6 +6333,33 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1) + +@property +def sliding_range_window(self): +from pyspark.sql.functions import lit +return Window.partitionBy('id').orderBy('v').rangeBetween(lit(-2.0), lit(4.0)) + +@property +def growing_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3) + +@property +def growing_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(F.unboundedPreceding(), F.lit(4.0)) --- End diff -- Seems like `F.unboundedFollowing()` was removed at #22841. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r231420974 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -73,68 +118,147 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. + * Helper function to get all relevant helper functions and data structures for window bounds * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * This function returns: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Function that returns a frame requires window bound indices in the python input row + * (unbounded window doesn't need it) + * (5) Function from frame index to its eval type */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = { +val dummyRow = new SpecificInternalRow() +val functionFrames = factories.map(_(dummyRow)) + +val evalTypes = functionFrames.map { + case _: UnboundedWindowFunctionFrame => PythonEvalType.SQL_UNBOUNDED_WINDOW_AGG_PANDAS_UDF + case _ => PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF +} + +val requiredIndices = functionFrames.map { + case _: UnboundedWindowFunctionFrame => 0 + case _ => 2 } -val unboundToRefMap = expressions.zip(references).toMap -val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) -UnsafeProjection.create( - child.output ++ patchedWindowExpression, - child.output) + +val upperBoundIndices = requiredIndices.scan(0)(_ + _).tail + +val boundIndices = (requiredIndices zip upperBoundIndices).map {case (num, upperBoundIndex) => --- End diff -- nit: `.map { case`. Also we might prefer `requiredIndices.zip(upperBoundIndices)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r231427886 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -73,68 +118,147 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. + * Helper function to get all relevant helper functions and data structures for window bounds * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * This function returns: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Function that returns a frame requires window bound indices in the python input row + * (unbounded window doesn't need it) + * (5) Function from frame index to its eval type */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = { +val dummyRow = new SpecificInternalRow() --- End diff -- This is just for calculating the `FunctionFrame` in advance, right? I'm wondering there is another simpler way to find out the function frame is unbounded or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r231407486 --- Diff: python/pyspark/sql/tests.py --- @@ -6323,6 +6333,33 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1) + +@property +def sliding_range_window(self): +from pyspark.sql.functions import lit +return Window.partitionBy('id').orderBy('v').rangeBetween(lit(-2.0), lit(4.0)) --- End diff -- `rangeBetween(column, column)` is not supported and removed at #22870. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r231429605 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -73,68 +118,147 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. + * Helper function to get all relevant helper functions and data structures for window bounds * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * This function returns: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Function that returns a frame requires window bound indices in the python input row + * (unbounded window doesn't need it) + * (5) Function from frame index to its eval type */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = { +val dummyRow = new SpecificInternalRow() +val functionFrames = factories.map(_(dummyRow)) + +val evalTypes = functionFrames.map { + case _: UnboundedWindowFunctionFrame => PythonEvalType.SQL_UNBOUNDED_WINDOW_AGG_PANDAS_UDF + case _ => PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF +} + +val requiredIndices = functionFrames.map { + case _: UnboundedWindowFunctionFrame => 0 + case _ => 2 } -val unboundToRefMap = expressions.zip(references).toMap -val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) -UnsafeProjection.create( - child.output ++ patchedWindowExpression, - child.output) + +val upperBoundIndices = requiredIndices.scan(0)(_ + _).tail + +val boundIndices = (requiredIndices zip upperBoundIndices).map {case (num, upperBoundIndex) => +if (num == 0) { + // Sentinel values for unbounded window + (-1, -1) +} else { + (upperBoundIndex - 2, upperBoundIndex - 1) +} +} + +def lowerBoundIndex(frameIndex: Int) = boundIndices(frameIndex)._1 +def upperBoundIndex(frameIndex: Int) = boundIndices(frameIndex)._2 +def frameEvalType(frameIndex: Int) = evalTypes(frameIndex) +def frameRequireIndex(frameIndex: Int) = + evalTypes(frameIndex) == PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF + +(requiredIndices.sum, lowerBoundIndex, upperBoundIndex, frameRequireIndex, frameEvalType) } protected override def doExecute(): RDD[InternalRow] = { -val inputRDD = child.execute() +// Unwrap the expressions and factories from the map. +val expressionsWithFrameIndex = + windowFrameExpressionFactoryPairs.map(_._1).zipWithIndex.flatMap { +case (buffer, frameIndex) => buffer.map( expr => (expr, frameIndex)) + } + +val expressions = expressionsWithFrameIndex.map(_._1) +val expressionIndexToFrameIndex = + expressionsWithFrameIndex.map(_._2).zipWithIndex.map(_.swap).toMap + +val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray + +val (numBoundIndices, lowerBoundIndex, upperBoundIndex, frameRequireIndex, frameEvalType) = + computeWindowBoundHelpers(factories) + +val funcEvalTypes = expressions.indices.map( + i => frameEvalType(expressionIndexToFrameIndex(i))) + +val numFrames = factories.length + +val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold +val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold --- End diff -- `conf` should work instead of `sqlContext.conf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r231416643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * It is very similar to [[WindowExec]] and has similar logic. The main difference is that this + * node doesn't not compute any window aggregation values. Instead, it computes the lower and + * upper bound for each window (i.e. window bounds) and pass the data and indices to python work + * to do the actual window aggregation. + * + * It currently materialize all data associated with the same partition key and pass them to + * Python. This is not strictly necessary for sliding windows and can be improved (by slicing + * data into overlapping small chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: + * (1) It doesn't have bound indices as input + * (2) The udf only needs to be evaluated once the in python worker (because the udf is + * deterministic and window bounds are the same for all windows) + * + * The logic to compute window bounds is delegated to [[WindowFunctionFrame]] and shared with + * [[WindowExec]] + * + * Note this doesn't support partial aggregation and all aggregation is computed from the entire + * window. + */ case class WindowInPandasExec( windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], -child: SparkPlan) extends UnaryExecNode { +child: SparkPlan +) extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { --- End diff -- nit: style ``` child: SparkPlan) extends ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r231413246 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(begin_index, end_index, *series): +import numpy as np +import pandas as pd +result = [] +for i in range(0, len(begin_index)): +begin = begin_index[i] +end = end_index[i] +range_index = np.arange(begin, end) +# Note: Create a slice from a series is actually pretty expensive to +# do for each window. However, there is no way to reduce/eliminate +# the cost of creating sub series here AFAIK. +# TODO: s.take might be the best way to create sub series +series_slices = [s.take(range_index) for s in series] --- End diff -- We can also use `s[begin:end]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22305 I'll do a review too hopefully this week. Sorry for the delay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22928: [SPARK-25926][CORE] Move config entries in core module t...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22928 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22913: [SPARK-25902][SQL] Add support for dates with mil...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22913#discussion_r230635196 --- Diff: sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java --- @@ -414,6 +416,21 @@ final int getInt(int rowId) { } } + private static class DateMilliAccessor extends ArrowVectorAccessor { + +private final DateMilliVector accessor; + +DateMilliAccessor(DateMilliVector vector) { + super(vector); + this.accessor = vector; +} + +@Override +final long getLong(int rowId) { --- End diff -- We should use `getInt()` to represent the number of days from 1970-01-01 if we map the type to date type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22942: [SPARK-25884][SQL][FOLLOW-UP] Add sample.json bac...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/22942 [SPARK-25884][SQL][FOLLOW-UP] Add sample.json back. ## What changes were proposed in this pull request? This is a follow-up pr of #22892 which moved `sample.json` from hive module to sql module, but we still need the file in hive module. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-25884/sample.json Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22942.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22942 commit 18ccff15a771d3e0221b49114ff300b0ef41a25b Author: Takuya UESHIN Date: 2018-11-05T04:54:42Z Add sample.json back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22942: [SPARK-25884][SQL][FOLLOW-UP] Add sample.json back.
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22942 cc @srowen @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT, and us...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22892 Seems like we still need `sample.json` in hive module. I'll submit a follow-up pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22913: [SPARK-25902][SQL] Add support for dates with mil...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22913#discussion_r230628333 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala --- @@ -71,6 +71,7 @@ object ArrowUtils { case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND => TimestampType +case date: ArrowType.Date if date.getUnit == DateUnit.MILLISECOND => TimestampType --- End diff -- Notice that Spark doesn't have date type with milliseconds, so if we want to map to date type, the hours, minutes, ... will be lost. Otherwise we have to map to timestamp type. Which is the proper behavior? cc @BryanCutler --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22928: [SPARK-25926][CORE] Move config entries in core module t...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22928 cc @jiangxb1987 @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22928: [SPARK-25926][CORE] Move config entries in core m...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/22928 [SPARK-25926][CORE] Move config entries in core module to internal.config. ## What changes were proposed in this pull request? Currently definitions of config entries in `core` module are in several files separately. We should move them into `internal/config` to be easy to manage. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-25926/single_config_file Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22928.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22928 commit e1dfa2dc2ecf9423e5e5eeb9825d45c59188afa5 Author: Takuya UESHIN Date: 2018-11-01T04:28:30Z Move configs for history server to internal.config. commit 65371025b04376ecf72e379beabb885cbb252b88 Author: Takuya UESHIN Date: 2018-11-01T04:37:51Z Move configs for status to internal.config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT,...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22892#discussion_r229737749 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{AnalysisException, ShowCreateTableSuite} +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class HiveShowCreateTableSuite extends ShowCreateTableSuite with TestHiveSingleton { + + test("simple hive table") { +withTable("t1") { + sql( +s"""CREATE TABLE t1 ( + | c1 INT COMMENT 'bla', + | c2 STRING + |) + |TBLPROPERTIES ( + | 'prop1' = 'value1', + | 'prop2' = 'value2' + |) + """.stripMargin + ) + + checkCreateTable("t1") +} + } + + test("simple external hive table") { +withTempDir { dir => + withTable("t1") { +sql( + s"""CREATE TABLE t1 ( + | c1 INT COMMENT 'bla', + | c2 STRING + |) + |LOCATION '${dir.toURI}' + |TBLPROPERTIES ( + | 'prop1' = 'value1', + | 'prop2' = 'value2' + |) + """.stripMargin +) + +checkCreateTable("t1") + } +} + } + + test("partitioned hive table") { --- End diff -- Yes, we have tests `"partitioned data source table"`, `"bucketed data source table"`, and `"partitioned bucketed data source table"` in `ShowCreateTableSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT,...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22892#discussion_r229278709 --- Diff: sql/core/src/test/resources/sample.json --- @@ -0,0 +1,2 @@ +{"a" : "2" ,"b" : "blah", "c_!@(3)":1} --- End diff -- It is needed to run a test "data source table with user specified schema". Actually we just needed to move the file. I'll remove the extra one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT,...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/22892 [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT, and use LOCATION when SHOW CREATE TABLE. ## What changes were proposed in this pull request? When `SHOW CREATE TABLE` for Datasource tables, we are missing `TBLPROPERTIES` and `COMMENT`, and we should use `LOCATION` instead of path in `OPTION`. ## How was this patch tested? Splitted `ShowCreateTableSuite` to confirm to work with both `InMemoryCatalog` and `HiveExternalCatalog`, and added some tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-25884/show_create_table Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22892.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22892 commit 2ca37061c37edb400c5779009c86abfb492032ac Author: Takuya UESHIN Date: 2018-10-30T06:33:46Z Split ShowCreateTableSuite. commit c2dce69370415f64c4ed802e5bdc241f1c868596 Author: Takuya UESHIN Date: 2018-10-30T09:55:06Z Add TBLPROPERTIES and COMMENT, and use LOCATION. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT, and us...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22892 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22816: [SPARK-25822][PySpark]Fix a race condition when releasin...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22816 Thanks! merging to master/2.4/2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22816: [SPARK-25822][PySpark]Fix a race condition when releasin...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22816 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r226624218 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -207,6 +207,14 @@ class SessionCatalog( "you cannot create a database with this name.") } validateName(dbName) +// SPARK-25464 fail if DB location exists and is not empty +val dbPath = new Path(dbDefinition.locationUri) +val fs = dbPath.getFileSystem(hadoopConf) +if (!externalCatalog.databaseExists(dbName) && fs.exists(dbPath) + && fs.listStatus(dbPath).nonEmpty) { --- End diff -- Oh, btw, I noticed that @gatorsmile is asking for doing the file existence check in the external catalog, not in the session catalog. Could you move this there? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r226536626 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -840,12 +840,19 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } test("create table in default db") { -val catalog = spark.sessionState.catalog -val tableIdent1 = TableIdentifier("tab1", None) -createTable(catalog, tableIdent1) -val expectedTableIdent = tableIdent1.copy(database = Some("default")) -val expectedTable = generateTable(catalog, expectedTableIdent) -checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1)) +var tablePath: Option[URI] = None +try { + val catalog = spark.sessionState.catalog + val tableIdent1 = TableIdentifier("tab1", None) + createTable(catalog, tableIdent1) + val expectedTableIdent = tableIdent1.copy(database = Some("default")) + val expectedTable = generateTable(catalog, expectedTableIdent) + tablePath = Some(expectedTable.location) + checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1)) +} finally { + // This is external table,so it is required to deleted the path --- End diff -- @HyukjinKwon The first one is `e,s` -> `e, s` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22762: [SPARK-25763][SQL][PYSPARK][TEST] Use more `@cont...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22762#discussion_r226266575 --- Diff: python/pyspark/sql/tests.py --- @@ -225,6 +225,55 @@ def sql_conf(self, pairs): else: self.spark.conf.set(key, old_value) +@contextmanager +def database(self, *databases): +""" +A convenient context manager to test with some specific databases. This drops the given +databases if exist and sets current database to "default" when it exits. +""" +assert hasattr(self, "spark"), "it should have 'spark' attribute, having a spark session." + +if len(databases) == 1 and isinstance(databases[0], (list, set)): --- End diff -- Sure, I'll remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22466: [SPARK-25464][SQL] Create Database to the location,only ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22466 Btw, what if `create database if not exists ...`? Seems like an exception will be thrown if the table exists even if we specify `if not exists`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22762: [SPARK-25763][SQL][PYSPARK][TEST] Use more `@contextmana...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22762 cc @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22762: [SPARK-25763][SQL][PYSPARK][TEST] Use more `@contextmana...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22762 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r226245229 --- Diff: python/pyspark/sql/tests.py --- @@ -2993,6 +2990,7 @@ def test_current_database(self): AnalysisException, "does_not_exist", lambda: spark.catalog.setCurrentDatabase("does_not_exist")) +spark.sql("DROP DATABASE some_db") --- End diff -- Submitted #22762. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22762: [SPARK-25763][SQL][PYSPARK][TEST] Use more `@cont...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/22762 [SPARK-25763][SQL][PYSPARK][TEST] Use more `@contextmanager` to ensure clean-up each test. ## What changes were proposed in this pull request? Currently each test in `SQLTest` in PySpark is not cleaned properly. We should introduce and use more `@contextmanager` to be convenient to clean up the context properly. ## How was this patch tested? Modified tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-25763/cleanup_sqltests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22762.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22762 commit 9f7901b887f8fcd8e03a81e09548bd3525f3072e Author: Takuya UESHIN Date: 2018-10-18T09:24:46Z Use more `@contextmanager` to ensure clean-up each test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r226236226 --- Diff: python/pyspark/sql/tests.py --- @@ -2993,6 +2990,7 @@ def test_current_database(self): AnalysisException, "does_not_exist", lambda: spark.catalog.setCurrentDatabase("does_not_exist")) +spark.sql("DROP DATABASE some_db") --- End diff -- We still need to cleanup each test. Otherwise, if the test fails before dropping `some_db`, it will cause to fail other tests. Seems like the current cleanup is not good. Let me refine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in Pyspark
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21990 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in P...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21990#discussion_r225811960 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -1136,4 +1121,27 @@ object SparkSession extends Logging { SparkSession.clearDefaultSession() } } + + /** + * Initialize extensions if the user has defined a configurator class in their SparkConf. + * This class will be applied to the extensions passed into this function. + */ + private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) { --- End diff -- I see, but in that case, we need to ensure that no injection of extensions is used in the default constructor to avoid initializing without extensions from the conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r225807075 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1115,9 +1126,38 @@ object SQLContext { }) } } -def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match { - case struct: StructType => createStructConverter(cls, struct.map(_.dataType)) - case _ => CatalystTypeConverters.createToCatalystConverter(dataType) +def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match { + case (cls: Class[_], struct: StructType) => --- End diff -- Reusing `JavaTypeInference.serializerFor` would be great, but currently it behaves a little differently. At least it doesn't support `java.lang.Iterable[_]`, so we can't use it immediately. We need to extend it to support `Iterable` (and also `deserializerFor`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r225807045 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1115,9 +1126,38 @@ object SQLContext { }) } } -def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match { - case struct: StructType => createStructConverter(cls, struct.map(_.dataType)) - case _ => CatalystTypeConverters.createToCatalystConverter(dataType) +def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match { --- End diff -- I'm okay to move this to `CatalystTypeConverters `, but note that unfortunately seems like `CatalystTypeConverters` doesn't work properly with nested beans as we are trying to support it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r225806457 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1115,9 +1126,38 @@ object SQLContext { }) } } -def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match { - case struct: StructType => createStructConverter(cls, struct.map(_.dataType)) - case _ => CatalystTypeConverters.createToCatalystConverter(dataType) +def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match { + case (cls: Class[_], struct: StructType) => +// bean type +createStructConverter(cls, struct.map(_.dataType)) + case (arrayType: Class[_], array: ArrayType) if arrayType.isArray => +// array type +val converter = createConverter(arrayType.getComponentType, array.elementType) +value => new GenericArrayData( + (0 until JavaArray.getLength(value)).map(i => +converter(JavaArray.get(value, i))).toArray) + case (_, array: ArrayType) => +// java.util.List type +val cls = classOf[java.util.List[_]] --- End diff -- On second thoughts, we should use `java.lang.Iterable` here. We can convert `Iterable` to `ArrayType` as `ArrayConverter` is trying. If we use `java.util.List` here, it leads behavior changes for list of primitives. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in P...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21990#discussion_r225762148 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -1136,4 +1121,27 @@ object SparkSession extends Logging { SparkSession.clearDefaultSession() } } + + /** + * Initialize extensions if the user has defined a configurator class in their SparkConf. + * This class will be applied to the extensions passed into this function. + */ + private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) { --- End diff -- Oh, I see, moving to the default constructor was not a good idea. How about the first suggestion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in P...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21990#discussion_r225439067 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -1136,4 +1121,27 @@ object SparkSession extends Logging { SparkSession.clearDefaultSession() } } + + /** + * Initialize extensions if the user has defined a configurator class in their SparkConf. + * This class will be applied to the extensions passed into this function. + */ + private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) { --- End diff -- On second thoughts, we could move the method call to the top of the default constructor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225389273 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,48 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(value: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case st: StructType => +v.isInstanceOf[InternalRow] && { + val row = v.asInstanceOf[InternalRow] + st.fields.map(_.dataType).zipWithIndex.forall { +case (dt, i) => doValidate(row.get(i, dt), dt) + } +} + case at: ArrayType => +v.isInstanceOf[GenericArrayData] && { + val ar = v.asInstanceOf[GenericArrayData].array + ar.isEmpty || doValidate(ar.head, at.elementType) +} + case mt: MapType => +v.isInstanceOf[ArrayBasedMapData] && { + val map = v.asInstanceOf[ArrayBasedMapData] + map.numElements() == 0 || { +doValidate(map.keyArray.array.head, mt.keyType) && + doValidate(map.valueArray.array.head, mt.valueType) + } +} + case ObjectType(cls) => cls.isInstance(v) + case udt: UserDefinedType[_] => doValidate(v, udt.sqlType) + case _ => false --- End diff -- We need to add `NullType` case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225387436 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,48 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(value: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case st: StructType => +v.isInstanceOf[InternalRow] && { + val row = v.asInstanceOf[InternalRow] + st.fields.map(_.dataType).zipWithIndex.forall { +case (dt, i) => doValidate(row.get(i, dt), dt) + } +} + case at: ArrayType => +v.isInstanceOf[GenericArrayData] && { + val ar = v.asInstanceOf[GenericArrayData].array + ar.isEmpty || doValidate(ar.head, at.elementType) +} + case mt: MapType => +v.isInstanceOf[ArrayBasedMapData] && { + val map = v.asInstanceOf[ArrayBasedMapData] + map.numElements() == 0 || { +doValidate(map.keyArray.array.head, mt.keyType) && + doValidate(map.valueArray.array.head, mt.valueType) + } +} --- End diff -- I'm wondering whether we don't need to check the whole elements for `ArrayType` and `MapType`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225194050 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,31 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(v: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case _: StructType => v.isInstanceOf[InternalRow] + case _: ArrayType => v.isInstanceOf[ArrayData] + case _: MapType => v.isInstanceOf[MapData] --- End diff -- Should validate recursively for `StructType`, `ArrayType`, and `MapType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in P...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21990#discussion_r225077270 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -1136,4 +1121,27 @@ object SparkSession extends Logging { SparkSession.clearDefaultSession() } } + + /** + * Initialize extensions if the user has defined a configurator class in their SparkConf. + * This class will be applied to the extensions passed into this function. + */ + private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) { --- End diff -- How about returning `SparkSessionExtensions` from this method, and modifying the secondary constructor of `SparkSession` as: ```scala private[sql] def this(sc: SparkContext) { this(sc, None, None, SparkSession.applyExtensionsFromConf(sc.getConf, new SparkSessionExtensions)) } ``` I'm a little worried whether the order we apply extensions might affect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r224671775 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1115,9 +1126,38 @@ object SQLContext { }) } } -def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match { - case struct: StructType => createStructConverter(cls, struct.map(_.dataType)) - case _ => CatalystTypeConverters.createToCatalystConverter(dataType) +def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match { + case (cls: Class[_], struct: StructType) => +// bean type +createStructConverter(cls, struct.map(_.dataType)) + case (arrayType: Class[_], array: ArrayType) if arrayType.isArray => +// array type +val converter = createConverter(arrayType.getComponentType, array.elementType) +value => new GenericArrayData( + (0 until JavaArray.getLength(value)).map(i => +converter(JavaArray.get(value, i))).toArray) + case (_, array: ArrayType) => +// java.util.List type +val cls = classOf[java.util.List[_]] --- End diff -- Seems like `JavaTypeInference.inferDataType()` supports `java.lang.Iterable`, not only `List`, but serializer/deserializer don't. Should we change `inferDataType()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r224667371 --- Diff: python/pyspark/sql/tests.py --- @@ -2993,6 +2990,7 @@ def test_current_database(self): AnalysisException, "does_not_exist", lambda: spark.catalog.setCurrentDatabase("does_not_exist")) +spark.sql("DROP DATABASE some_db") --- End diff -- We should surround with try-finally? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r224666263 --- Diff: python/pyspark/sql/tests.py --- @@ -350,9 +350,6 @@ def test_sqlcontext_reuses_sparksession(self): def tearDown(self): --- End diff -- Now we can remove this method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r224318955 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -108,6 +108,16 @@ object TestingUDT { } } +object TestingValueClass { + case class IntWrapper(i: Int) extends AnyVal + case class StrWrapper(s: String) extends AnyVal + + case class ValueClassData( +intField: Int, +wrappedInt: IntWrapper, +strField: String, +wrappedStr: StrWrapper) --- End diff -- We might need a comment to describe what this class is look like in Java. Seems like it has 2 int fields `intField`, `wrappedInt`, and 2 string fields `strField`, `wrappedStr`. I'm not sure it is the same in Scala 2.12, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22545: [SPARK-25525][SQL][PYSPARK] Do not update conf fo...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22545#discussion_r223953269 --- Diff: python/pyspark/sql/session.py --- @@ -156,7 +156,7 @@ def getOrCreate(self): default. >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate() ->>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1" +>>> s1.conf.get("k1") == "v1" --- End diff -- Submitted a pr to update the migration guide #22682. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22682: [SPARK-20946][SPARK-25525][SQL][FOLLOW-UP] Update...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/22682 [SPARK-20946][SPARK-25525][SQL][FOLLOW-UP] Update the migration guide. ## What changes were proposed in this pull request? This is a follow-up pr of #18536 and #22545 to update the migration guide. ## How was this patch tested? Build and check the doc locally. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-20946_25525/migration_guide Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22682.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22682 commit 3bf42af5695729597e1f49b11f50f2fb450ead7f Author: Takuya UESHIN Date: 2018-10-10T06:29:20Z Update the migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22682: [SPARK-20946][SPARK-25525][SQL][FOLLOW-UP] Update the mi...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22682 cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22527 Seems like there is a merge commit in apache git https://git-wip-us.apache.org/repos/asf?p=spark.git, but not in GitHub yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22527 Sorry, the merge script failed. Let me try again a while later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22527 I'd merge this not to block the following prs to support array/list and map of beans. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org