[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-09 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r240092271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of 
nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
--- End diff --

Thanks! Could you submit the PR to fix `WindowExec` please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23245: [SPARK-26060][SQL][FOLLOW-UP] Rename the config name.

2018-12-07 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23245
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-06 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239666242
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of 
nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
--- End diff --

oh, we needed to fix the original `WindowExec` as well, didn't we.
How about fixing it in a separate PR first to see feedbacks from other 
reviewers who are familiar with window functions, and following the PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23245: [SPARK-26060][SQL][FOLLOW-UP] Rename the config n...

2018-12-06 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/23245

[SPARK-26060][SQL][FOLLOW-UP] Rename the config name.

## What changes were proposed in this pull request?

This is a follow-up of #23031 to rename the config name.
The `execution` category might not be needed.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-26060/rename_config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23245.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23245


commit 021134cd2b6a0a82ef8ef36a5ce122bff397ab32
Author: Takuya UESHIN 
Date:   2018-12-06T09:24:30Z

Rename the config name.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23245: [SPARK-26060][SQL][FOLLOW-UP] Rename the config name.

2018-12-06 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23245
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239312302
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of 
nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
--- End diff --

I guess we have to use `GenerateOrdering.compare()` to support complex 
types as well as `GroupedIterator` does.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239307965
  
--- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py ---
@@ -231,12 +266,10 @@ def test_array_type(self):
 self.assertEquals(result1.first()['v2'], [1.0, 2.0])
 
 def test_invalid_args(self):
-from pyspark.sql.functions import pandas_udf, PandasUDFType
+from pyspark.sql.functions import mean, pandas_udf, PandasUDFType
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239307779
  
--- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py ---
@@ -87,8 +96,34 @@ def ordered_window(self):
 def unpartitioned_window(self):
 return Window.partitionBy()
 
+@property
+def sliding_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
+
+@property
+def sliding_range_window(self):
+return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4)
+
+@property
+def growing_row_window(self):
+return 
Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3)
+
+@property
+def growing_range_window(self):
+return Window.partitionBy('id').orderBy('v') \
+.rangeBetween(Window.unboundedPreceding, 4)
+
+@property
+def shrinking_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 
Window.unboundedFollowing)
+
+@property
+def shrinking_range_window(self):
+return Window.partitionBy('id').orderBy('v') \
+.rangeBetween(-3, Window.unboundedFollowing)
+
 def test_simple(self):
-from pyspark.sql.functions import mean
+from pyspark.sql.functions import pandas_udf, PandasUDFType, 
percent_rank, mean, max
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239308506
  
--- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py ---
@@ -245,11 +278,101 @@ def test_invalid_args(self):
 foo_udf = pandas_udf(lambda x: x, 'v double', 
PandasUDFType.GROUPED_MAP)
 df.withColumn('v2', foo_udf(df['v']).over(w))
 
-with QuietTest(self.sc):
-with self.assertRaisesRegexp(
-AnalysisException,
-'.*Only unbounded window frame is supported.*'):
-df.withColumn('mean_v', mean_udf(df['v']).over(ow))
+def test_bounded_simple(self):
+from pyspark.sql.functions import mean, max, min, count
+
+df = self.data
+w1 = self.sliding_row_window
+w2 = self.shrinking_range_window
+
+plus_one = self.python_plus_one
+count_udf = self.pandas_agg_count_udf
+mean_udf = self.pandas_agg_mean_udf
+max_udf = self.pandas_agg_max_udf
+min_udf = self.pandas_agg_min_udf
+
+result1 = df.withColumn('mean_v', 
mean_udf(plus_one(df['v'])).over(w1)) \
+.withColumn('count_v', count_udf(df['v']).over(w2)) \
+.withColumn('max_v',  max_udf(df['v']).over(w2)) \
+.withColumn('min_v', min_udf(df['v']).over(w1))
+
+expected1 = df.withColumn('mean_v', 
mean(plus_one(df['v'])).over(w1)) \
+.withColumn('count_v', count(df['v']).over(w2)) \
+.withColumn('max_v', max(df['v']).over(w2)) \
+.withColumn('min_v', min(df['v']).over(w1))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_growing_window(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w1 = self.growing_row_window
+w2 = self.growing_range_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+.withColumn('m2', mean_udf(df['v']).over(w2))
+
+expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+.withColumn('m2', mean(df['v']).over(w2))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_sliding_window(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w1 = self.sliding_row_window
+w2 = self.sliding_range_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+.withColumn('m2', mean_udf(df['v']).over(w2))
+
+expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+.withColumn('m2', mean(df['v']).over(w2))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_shrinking_window(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w1 = self.shrinking_row_window
+w2 = self.shrinking_range_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+.withColumn('m2', mean_udf(df['v']).over(w2))
+
+expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+.withColumn('m2', mean(df['v']).over(w2))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_bounded_mixed(self):
+from pyspark.sql.functions import mean, max, min, count
--- End diff --

We don't need min and count?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-12-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r239307483
  
--- Diff: python/pyspark/sql/tests/test_pandas_udf_window.py ---
@@ -44,9 +44,18 @@ def python_plus_one(self):
 
 @property
 def pandas_scalar_time_two(self):
-from pyspark.sql.functions import pandas_udf
+from pyspark.sql.functions import pandas_udf, PandasUDFType
--- End diff --

nit: we can revert this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238146834
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -64,7 +85,7 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
 i = 0
 while (i < validExprs.length) {
   val (_, ordinal) = validExprs(i)
-  mutableRow(ordinal) = buffer(ordinal)
+  fieldWriters(i)(buffer(ordinal))
--- End diff --

Since `fieldWriters` is accessed via index, we should use `IndexedSeq` or 
`Array` explicitly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...

2018-11-30 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23176#discussion_r237826533
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
 ---
@@ -293,6 +293,54 @@ class PredicateSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 }
   }
 
+  test("INSET: binary") {
--- End diff --

Submitted #23187.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23187: [SPARK-26211][SQL][FOLLOW-UP] Combine test cases ...

2018-11-30 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/23187

[SPARK-26211][SQL][FOLLOW-UP] Combine test cases for `In` and `InSet`.

## What changes were proposed in this pull request?

This is a follow pr of #23176.

`In` and `InSet` are semantically equal, so the tests for `In` should pass 
with `InSet`, and vice versa.
This combines those test cases.

## How was this patch tested?

The combined tests and existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-26211/in_inset_tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23187.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23187


commit a87cb7739a0d9e864d8a03376962aa06a99cc100
Author: Takuya UESHIN 
Date:   2018-11-30T11:00:02Z

Combine test cases for `In` and `InSet`.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23187: [SPARK-26211][SQL][FOLLOW-UP] Combine test cases for `In...

2018-11-30 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23187
  
cc @gatorsmile @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...

2018-11-29 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23176#discussion_r237771176
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
 ---
@@ -293,6 +293,54 @@ class PredicateSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 }
   }
 
+  test("INSET: binary") {
--- End diff --

Sure, I'll do it later. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...

2018-11-29 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23031
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...

2018-11-29 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23031
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...

2018-11-29 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23031
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...

2018-11-29 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23176#discussion_r237400321
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) 
extends UnaryExpression with
   }
 
   @transient lazy val set: Set[Any] = child.dataType match {
-case _: AtomicType => hset
+case t: AtomicType if !t.isInstanceOf[BinaryType] => hset
 case _: NullType => hset
 case _ =>
+  val ord = TypeUtils.getInterpretedOrdering(child.dataType)
+  val ordering = if (hasNull) {
+new Ordering[Any] {
+  override def compare(x: Any, y: Any): Int = {
+if (x == null && y == null) {
+  0
+} else if (x == null) {
+  -1
+} else if (y == null) {
+  1
+} else {
+  ord.compare(x, y)
+}
+  }
+}
+  } else {
+ord
+  }
   // for structs use interpreted ordering to be able to compare 
UnsafeRows with non-UnsafeRows
-  TreeSet.empty(TypeUtils.getInterpretedOrdering(child.dataType)) ++ 
hset
+  TreeSet.empty(ordering) ++ hset
--- End diff --

Actually we are using `nullSafeEval`, so we don't need to update it.
Instead, I'm updating to use `nullSafeCodeGen` for codegen path.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...

2018-11-29 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23176#discussion_r237399670
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) 
extends UnaryExpression with
   }
 
   @transient lazy val set: Set[Any] = child.dataType match {
-case _: AtomicType => hset
+case t: AtomicType if !t.isInstanceOf[BinaryType] => hset
 case _: NullType => hset
 case _ =>
+  val ord = TypeUtils.getInterpretedOrdering(child.dataType)
+  val ordering = if (hasNull) {
+new Ordering[Any] {
+  override def compare(x: Any, y: Any): Int = {
--- End diff --

Thanks! yeah, I'm updating as @cloud-fan's idea.
Also we can use `nullSafeCodeGen` for codegen path, I'll update it as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23176: [SPARK-26211][SQL] Fix InSet for binary, and struct and ...

2018-11-28 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23176
  
cc @cloud-fan @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...

2018-11-28 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/23176

[SPARK-26211][SQL] Fix InSet for binary, and struct and array with null.

## What changes were proposed in this pull request?

Currently `InSet` doesn't work properly for binary type, or struct and 
array type with null value in the set.
Because, as for binary type, the `HashSet` doesn't work properly for 
`Array[Byte]`, and as for struct and array type with null value in the set, the 
`ordering` will throw a `NPE`.

## How was this patch tested?

Added a few tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-26211/inset

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23176.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23176


commit 277c48f63b9d36028b5dfbb3c850418713197cc4
Author: Takuya UESHIN 
Date:   2018-11-29T06:47:29Z

Fix InSet for binary, and struct and array with null.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...

2018-11-28 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23031
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23031: [SPARK-26060][SQL] Track SparkConf entries and ma...

2018-11-28 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23031#discussion_r237071928
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala 
---
@@ -154,5 +154,9 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new 
SQLConf) {
 if (SQLConf.staticConfKeys.contains(key)) {
   throw new AnalysisException(s"Cannot modify the value of a static 
config: $key")
 }
+if (sqlConf.setCommandRejectsSparkConfs &&
+ConfigEntry.findEntry(key) != null && 
!SQLConf.sqlConfEntries.containsKey(key)) {
--- End diff --

I'm sorry for the delay.
As per the comment 
https://github.com/apache/spark/pull/22887#issuecomment-442425557, I'd leave it 
as is for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-28 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236973308
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
 ---
@@ -19,6 +19,12 @@ package org.apache.spark.sql.catalyst.util
 
 import java.util.{Map => JavaMap}
 
+/**
+ * A simple `MapData` implementation which is backed by 2 arrays.
+ *
+ * Note that, user is responsible to guarantee that the key array does not 
have duplicated
+ * elements, otherwise the behavior is undefined.
--- End diff --

nit: we might need to add the same note to the 3rd and 4th 
`ArrayBasedMapData.apply()` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236955791
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+  assert(keyType != NullType, "map key cannot be null type.")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
--- End diff --

We need to exempt `BinaryType` from `AtomicType` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236958252
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.types.{ArrayType, IntegerType, StructType}
+import org.apache.spark.unsafe.Platform
+
+class ArrayBasedMapBuilderSuite extends SparkFunSuite {
+
+  test("basic") {
+val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
+builder.put(1, 1)
+builder.put(InternalRow(2, 2))
+builder.putAll(new GenericArrayData(Seq(3)), new 
GenericArrayData(Seq(3)))
+val map = builder.build()
+assert(map.numElements() == 3)
+assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 1, 2 -> 2, 3 -> 
3))
+  }
+
+  test("fail with null key") {
+val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
+builder.put(1, null) // null value is OK
+val e = intercept[RuntimeException](builder.put(null, 1))
+assert(e.getMessage.contains("Cannot use null as map key"))
+  }
+
+  test("remove duplicated keys with last wins policy") {
+val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
+builder.put(1, 1)
+builder.put(2, 2)
+builder.put(1, 2)
+val map = builder.build()
+assert(map.numElements() == 2)
+assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 2, 2 -> 2))
+  }
+
+  test("struct type key") {
+val builder = new ArrayBasedMapBuilder(new StructType().add("i", 
"int"), IntegerType)
+builder.put(InternalRow(1), 1)
+builder.put(InternalRow(2), 2)
+val unsafeRow = {
+  val row = new UnsafeRow(1)
+  val bytes = new Array[Byte](16)
+  row.pointTo(bytes, 16)
+  row.setInt(0, 1)
+  row
+}
+builder.put(unsafeRow, 3)
+val map = builder.build()
+assert(map.numElements() == 2)
+assert(ArrayBasedMapData.toScalaMap(map) == Map(InternalRow(1) -> 3, 
InternalRow(2) -> 2))
+  }
+
+  test("array type key") {
+val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), 
IntegerType)
+builder.put(new GenericArrayData(Seq(1, 1)), 1)
+builder.put(new GenericArrayData(Seq(2, 2)), 2)
+val unsafeArray = {
+  val array = new UnsafeArrayData()
+  val bytes = new Array[Byte](24)
+  Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
+  array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
+  array.setInt(0, 1)
+  array.setInt(1, 1)
+  array
+}
+builder.put(unsafeArray, 3)
+val map = builder.build()
+assert(map.numElements() == 2)
+assert(ArrayBasedMapData.toScalaMap(map) ==
+  Map(new GenericArrayData(Seq(1, 1)) -> 3, new 
GenericArrayData(Seq(2, 2)) -> 2))
+  }
--- End diff --

We should have a binary type key test as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...

2018-11-21 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23101
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...

2018-11-20 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23101
  
also cc @wangyum for the owner of 
[SPARK-25330](https://issues.apache.org/jira/browse/SPARK-25330).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...

2018-11-20 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23101
  
cc @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...

2018-11-20 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23101
  
I guess we need to update `dev/deps/spark-deps-hadoop-2.7` file as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23082: [SPARK-26112][SQL] Update since versions of new built-in...

2018-11-19 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23082
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23082: [SPARK-26112][SQL] Update since versions of new built-in...

2018-11-18 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23082
  
cc @cloud-fan @gatorsmile @dongjoon-hyun 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23082: [SPARK-26112][SQL] Update since versions of new b...

2018-11-18 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/23082

[SPARK-26112][SQL] Update since versions of new built-in functions.

## What changes were proposed in this pull request?

The following 5 functions were removed from branch-2.4:

- map_entries
- map_filter
- transform_values
- transform_keys
- map_zip_with

We should update the since version to 3.0.0.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-26112/since

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23082


commit f26db66986a12049e14d1b234840b66f0b96767f
Author: Takuya UESHIN 
Date:   2018-11-19T06:36:38Z

Update since version to 3.0.0.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23045: [SPARK-26071][SQL] disallow map as map key

2018-11-18 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23045
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23045: [SPARK-26071][SQL] disallow map as map key

2018-11-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23045#discussion_r234502854
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -521,13 +521,18 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
 case class MapConcat(children: Seq[Expression]) extends 
ComplexTypeMergingExpression {
 
   override def checkInputDataTypes(): TypeCheckResult = {
-var funcName = s"function $prettyName"
+val funcName = s"function $prettyName"
 if (children.exists(!_.dataType.isInstanceOf[MapType])) {
   TypeCheckResult.TypeCheckFailure(
 s"input to $funcName should all be of type map, but it's " +
   children.map(_.dataType.catalogString).mkString("[", ", ", "]"))
 } else {
-  TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
funcName)
+  val sameTypeCheck = 
TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName)
+  if (sameTypeCheck.isFailure) {
+sameTypeCheck
+  } else {
+TypeUtils.checkForMapKeyType(dataType.keyType)
--- End diff --

oh, I see. thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23045: [SPARK-26071][SQL] disallow map as map key

2018-11-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23045#discussion_r234481249
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -521,13 +521,18 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
 case class MapConcat(children: Seq[Expression]) extends 
ComplexTypeMergingExpression {
 
   override def checkInputDataTypes(): TypeCheckResult = {
-var funcName = s"function $prettyName"
+val funcName = s"function $prettyName"
 if (children.exists(!_.dataType.isInstanceOf[MapType])) {
   TypeCheckResult.TypeCheckFailure(
 s"input to $funcName should all be of type map, but it's " +
   children.map(_.dataType.catalogString).mkString("[", ", ", "]"))
 } else {
-  TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
funcName)
+  val sameTypeCheck = 
TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName)
+  if (sameTypeCheck.isFailure) {
+sameTypeCheck
+  } else {
+TypeUtils.checkForMapKeyType(dataType.keyType)
--- End diff --

I don't think we need this. The children already should not have map type 
keys?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.en...

2018-11-15 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23046#discussion_r234073072
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -280,7 +280,7 @@ object ShuffleExchangeExec {
   }
   // The comparator for comparing row hashcode, which should 
always be Integer.
   val prefixComparator = PrefixComparators.LONG
-  val canUseRadixSort = 
SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)
+  val canUseRadixSort = SQLConf.get.enableRadixSort
--- End diff --

Ah, yes, to be exact, if users specified the config to `SparkConf` before 
Spark ran, it could be read.
I'd leave which branch we should backport to to you and other reviewers. 
@jiangxb1987 @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.enableRad...

2018-11-15 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23046
  
cc @jiangxb1987 @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.en...

2018-11-15 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/23046

[SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.enableRadixSort` instead of 
`SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`.

## What changes were proposed in this pull request?

This is a follow-up of #20393.
We should read the conf `"spark.sql.sort.enableRadixSort"` from `SQLConf` 
instead of `SparkConf`, i.e., use `SQLConf.get.enableRadixSort` instead of 
`SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`, otherwise the config is 
never read.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-23207/conf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23046.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23046


commit d8db5f3759b998e3ef5720f56c1a048c1494a1c1
Author: Takuya UESHIN 
Date:   2018-11-15T11:30:13Z

Use `SQLConf.get.enableRadixSort` instead of 
`SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...

2018-11-15 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23031
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23031: [SPARK-26060][CORE][SQL] Track SparkConf entries and mak...

2018-11-14 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23031
  
@vanzin Thanks for letting me know it and that's really good to know.
I'll update this to use it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23031: [SPARK-26060][CORE][SQL] Track SparkConf entries and mak...

2018-11-14 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/23031
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23031: [SPARK-26060][CORE][SQL] Track SparkConf entries ...

2018-11-14 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/23031

[SPARK-26060][CORE][SQL] Track SparkConf entries and make SET command 
reject such entries.

## What changes were proposed in this pull request?

Currently the `SET` command works without any warnings even if the 
specified key is for `SparkConf` entries and it has no effect because the 
command does not update `SparkConf`, but the behavior might confuse users. We 
should track `SparkConf` entries and make the command reject for such entries.

## How was this patch tested?

Added a test and existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-26060/set_command

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23031.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23031


commit e8de7bb6847486ab9416bda8c97e8b0ce6c3bbe2
Author: Takuya UESHIN 
Date:   2018-11-13T09:59:31Z

Register SparkConfig entries.

commit 33d831e9f69b6a7f5bf965ba7357e73a70ac4239
Author: Takuya UESHIN 
Date:   2018-11-13T11:20:25Z

Make SET command reject SparkConf entries.

commit e19b1e430e967d0aeb5fe2a112e3a82ca18c5c80
Author: Takuya UESHIN 
Date:   2018-11-14T08:15:09Z

Update the migration guide.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22200: [SPARK-25208][SQL] Loosen Cast.forceNullable for ...

2018-11-12 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22200#discussion_r232608144
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 ---
@@ -154,6 +154,15 @@ object Cast {
 fromPrecedence >= 0 && fromPrecedence < toPrecedence
   }
 
+  def canNullSafeCastToDecimal(from: DataType, to: DecimalType): Boolean = 
from match {
+case from: BooleanType if to.isWiderThan(DecimalType.BooleanDecimal) 
=> true
+case from: NumericType if to.isWiderThan(from) => true
+case from: DecimalType =>
+  // truncating or precision lose
+  (to.precision - to.scale) > (from.precision - from.scale)
--- End diff --

In this case, we need rounding, then we need an extra precision to avoid 
overflow.
E.g., cast 99.95 of Decimal(4, 2) to Decimal(3, 1) will be 100.0, but 
it’s an overflow and ends up to null. We need Decimal(4, 1) to be null-safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22969: [SPARK-22827][SQL][FOLLOW-UP] Throw `SparkOutOfMe...

2018-11-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22969#discussion_r231783323
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -787,7 +789,7 @@ case class HashAggregateExec(
  |$unsafeRowKeys, ${hashEval.value});
  |  if ($unsafeRowBuffer == null) {
  |// failed to allocate the first page
- |throw new OutOfMemoryError("No enough memory for 
aggregation");
+ |throw new $oomeClassName("No enough memory for aggregation");
--- End diff --

Yes, I think so based on my investigation. I grep-ed with 
"OutOfMemoryError" and checked the suspicious places.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22969: [SPARK-22827][SQL][FOLLOW-UP] Throw `SparkOutOfMemoryErr...

2018-11-07 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22969
  
cc @sitalkedia @cloud-fan @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22969: [SPARK-22827][SQL][FOLLOW-UP] Throw `SparkOutOfMe...

2018-11-07 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/22969

[SPARK-22827][SQL][FOLLOW-UP] Throw `SparkOutOfMemoryError` in 
`HashAggregateExec`, too.

## What changes were proposed in this pull request?

This is a follow-up pr of #20014 which introduced `SparkOutOfMemoryError` 
to avoid killing the entire executor when an `OutOfMemoryError` is thrown.
We should throw `SparkOutOfMemoryError` in `HashAggregateExec`, too.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-22827/oome

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22969.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22969


commit f07ab0938563fe63dd20fa756543b14478a27c2f
Author: Takuya UESHIN 
Date:   2018-11-08T04:59:35Z

Throw `SparkOutOfMemoryError` in `HashAggregateExec`, too.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r231408205
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -6323,6 +6333,33 @@ def ordered_window(self):
 def unpartitioned_window(self):
 return Window.partitionBy()
 
+@property
+def sliding_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
+
+@property
+def sliding_range_window(self):
+from pyspark.sql.functions import lit
+return 
Window.partitionBy('id').orderBy('v').rangeBetween(lit(-2.0), lit(4.0))
+
+@property
+def growing_row_window(self):
+return 
Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3)
+
+@property
+def growing_range_window(self):
+return Window.partitionBy('id').orderBy('v') \
+.rangeBetween(F.unboundedPreceding(), F.lit(4.0))
--- End diff --

Seems like `F.unboundedFollowing()` was removed at #22841.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r231420974
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -73,68 +118,147 @@ case class WindowInPandasExec(
   }
 
   /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor 
side.
+   * Helper function to get all relevant helper functions and data 
structures for window bounds
*
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
+   * This function returns:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Function that returns a frame requires window bound indices in 
the python input row
+   * (unbounded window doesn't need it)
+   * (5) Function from frame index to its eval type
*/
-  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
-val references = expressions.zipWithIndex.map { case (e, i) =>
-  // Results of window expressions will be on the right side of 
child's output
-  BoundReference(child.output.size + i, e.dataType, e.nullable)
+  private def computeWindowBoundHelpers(
+  factories: Seq[InternalRow => WindowFunctionFrame]
+  ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = {
+val dummyRow = new SpecificInternalRow()
+val functionFrames = factories.map(_(dummyRow))
+
+val evalTypes = functionFrames.map {
+  case _: UnboundedWindowFunctionFrame => 
PythonEvalType.SQL_UNBOUNDED_WINDOW_AGG_PANDAS_UDF
+  case _ => PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF
+}
+
+val requiredIndices = functionFrames.map {
+  case _: UnboundedWindowFunctionFrame => 0
+  case _ => 2
 }
-val unboundToRefMap = expressions.zip(references).toMap
-val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
-UnsafeProjection.create(
-  child.output ++ patchedWindowExpression,
-  child.output)
+
+val upperBoundIndices = requiredIndices.scan(0)(_ + _).tail
+
+val boundIndices = (requiredIndices zip upperBoundIndices).map {case 
(num, upperBoundIndex) =>
--- End diff --

nit: `.map { case`.
Also we might prefer `requiredIndices.zip(upperBoundIndices)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r231427886
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -73,68 +118,147 @@ case class WindowInPandasExec(
   }
 
   /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor 
side.
+   * Helper function to get all relevant helper functions and data 
structures for window bounds
*
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
+   * This function returns:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Function that returns a frame requires window bound indices in 
the python input row
+   * (unbounded window doesn't need it)
+   * (5) Function from frame index to its eval type
*/
-  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
-val references = expressions.zipWithIndex.map { case (e, i) =>
-  // Results of window expressions will be on the right side of 
child's output
-  BoundReference(child.output.size + i, e.dataType, e.nullable)
+  private def computeWindowBoundHelpers(
+  factories: Seq[InternalRow => WindowFunctionFrame]
+  ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = {
+val dummyRow = new SpecificInternalRow()
--- End diff --

This is just for calculating the `FunctionFrame` in advance, right?
I'm wondering there is another simpler way to find out the function frame 
is unbounded or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r231407486
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -6323,6 +6333,33 @@ def ordered_window(self):
 def unpartitioned_window(self):
 return Window.partitionBy()
 
+@property
+def sliding_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
+
+@property
+def sliding_range_window(self):
+from pyspark.sql.functions import lit
+return 
Window.partitionBy('id').orderBy('v').rangeBetween(lit(-2.0), lit(4.0))
--- End diff --

`rangeBetween(column, column)` is not supported and removed at #22870.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r231429605
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -73,68 +118,147 @@ case class WindowInPandasExec(
   }
 
   /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor 
side.
+   * Helper function to get all relevant helper functions and data 
structures for window bounds
*
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
+   * This function returns:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Function that returns a frame requires window bound indices in 
the python input row
+   * (unbounded window doesn't need it)
+   * (5) Function from frame index to its eval type
*/
-  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
-val references = expressions.zipWithIndex.map { case (e, i) =>
-  // Results of window expressions will be on the right side of 
child's output
-  BoundReference(child.output.size + i, e.dataType, e.nullable)
+  private def computeWindowBoundHelpers(
+  factories: Seq[InternalRow => WindowFunctionFrame]
+  ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = {
+val dummyRow = new SpecificInternalRow()
+val functionFrames = factories.map(_(dummyRow))
+
+val evalTypes = functionFrames.map {
+  case _: UnboundedWindowFunctionFrame => 
PythonEvalType.SQL_UNBOUNDED_WINDOW_AGG_PANDAS_UDF
+  case _ => PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF
+}
+
+val requiredIndices = functionFrames.map {
+  case _: UnboundedWindowFunctionFrame => 0
+  case _ => 2
 }
-val unboundToRefMap = expressions.zip(references).toMap
-val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
-UnsafeProjection.create(
-  child.output ++ patchedWindowExpression,
-  child.output)
+
+val upperBoundIndices = requiredIndices.scan(0)(_ + _).tail
+
+val boundIndices = (requiredIndices zip upperBoundIndices).map {case 
(num, upperBoundIndex) =>
+if (num == 0) {
+  // Sentinel values for unbounded window
+  (-1, -1)
+} else {
+  (upperBoundIndex - 2, upperBoundIndex - 1)
+}
+}
+
+def lowerBoundIndex(frameIndex: Int) = boundIndices(frameIndex)._1
+def upperBoundIndex(frameIndex: Int) = boundIndices(frameIndex)._2
+def frameEvalType(frameIndex: Int) = evalTypes(frameIndex)
+def frameRequireIndex(frameIndex: Int) =
+  evalTypes(frameIndex) == 
PythonEvalType.SQL_BOUNDED_WINDOW_AGG_PANDAS_UDF
+
+(requiredIndices.sum, lowerBoundIndex, upperBoundIndex, 
frameRequireIndex, frameEvalType)
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-val inputRDD = child.execute()
+// Unwrap the expressions and factories from the map.
+val expressionsWithFrameIndex =
+  windowFrameExpressionFactoryPairs.map(_._1).zipWithIndex.flatMap {
+case (buffer, frameIndex) => buffer.map( expr => (expr, 
frameIndex))
+  }
+
+val expressions = expressionsWithFrameIndex.map(_._1)
+val expressionIndexToFrameIndex =
+  expressionsWithFrameIndex.map(_._2).zipWithIndex.map(_.swap).toMap
+
+val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
+
+val (numBoundIndices, lowerBoundIndex, upperBoundIndex, 
frameRequireIndex, frameEvalType) =
+  computeWindowBoundHelpers(factories)
+
+val funcEvalTypes = expressions.indices.map(
+  i => frameEvalType(expressionIndexToFrameIndex(i)))
+
+val numFrames = factories.length
+
+val inMemoryThreshold = 
sqlContext.conf.windowExecBufferInMemoryThreshold
+val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold
--- End diff --

`conf` should work instead of `sqlContext.conf`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r231416643
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in 
a single partition.
+ *
+ * It is very similar to [[WindowExec]] and has similar logic. The main 
difference is that this
+ * node doesn't not compute any window aggregation values. Instead, it 
computes the lower and
+ * upper bound for each window (i.e. window bounds) and pass the data and 
indices to python work
+ * to do the actual window aggregation.
+ *
+ * It currently materialize all data associated with the same partition 
key and pass them to
+ * Python. This is not strictly necessary for sliding windows and can be 
improved (by slicing
+ * data into overlapping small chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker 
(because the udf is
+ * deterministic and window bounds are the same for all windows)
+ *
+ * The logic to compute window bounds is delegated to 
[[WindowFunctionFrame]] and shared with
+ * [[WindowExec]]
+ *
+ * Note this doesn't support partial aggregation and all aggregation is 
computed from the entire
+ * window.
+ */
 case class WindowInPandasExec(
 windowExpression: Seq[NamedExpression],
 partitionSpec: Seq[Expression],
 orderSpec: Seq[SortOrder],
-child: SparkPlan) extends UnaryExecNode {
+child: SparkPlan
+) extends WindowExecBase(windowExpression, partitionSpec, orderSpec, 
child) {
--- End diff --

nit: style

```
child: SparkPlan)
  extends ...
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...

2018-11-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22305#discussion_r231413246
  
--- Diff: python/pyspark/worker.py ---
@@ -154,6 +154,47 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_bounded_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(begin_index, end_index, *series):
+import numpy as np
+import pandas as pd
+result = []
+for i in range(0, len(begin_index)):
+begin = begin_index[i]
+end = end_index[i]
+range_index = np.arange(begin, end)
+# Note: Create a slice from a series is actually pretty 
expensive to
+#   do for each window. However, there is no way to 
reduce/eliminate
+#   the cost of creating sub series here AFAIK.
+# TODO: s.take might be the best way to create sub series
+series_slices = [s.take(range_index) for s in series]
--- End diff --

We can also use `s[begin:end]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...

2018-11-05 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22305
  
I'll do a review too hopefully this week. Sorry for the delay.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22928: [SPARK-25926][CORE] Move config entries in core module t...

2018-11-05 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22928
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22913: [SPARK-25902][SQL] Add support for dates with mil...

2018-11-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22913#discussion_r230635196
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java 
---
@@ -414,6 +416,21 @@ final int getInt(int rowId) {
 }
   }
 
+  private static class DateMilliAccessor extends ArrowVectorAccessor {
+
+private final DateMilliVector accessor;
+
+DateMilliAccessor(DateMilliVector vector) {
+  super(vector);
+  this.accessor = vector;
+}
+
+@Override
+final long getLong(int rowId) {
--- End diff --

We should use `getInt()` to represent the number of days from 1970-01-01 if 
we map the type to date type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22942: [SPARK-25884][SQL][FOLLOW-UP] Add sample.json bac...

2018-11-04 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/22942

[SPARK-25884][SQL][FOLLOW-UP] Add sample.json back.

## What changes were proposed in this pull request?

This is a follow-up pr of #22892 which moved `sample.json` from hive module 
to sql module, but we still need the file in hive module.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-25884/sample.json

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22942.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22942


commit 18ccff15a771d3e0221b49114ff300b0ef41a25b
Author: Takuya UESHIN 
Date:   2018-11-05T04:54:42Z

Add sample.json back.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22942: [SPARK-25884][SQL][FOLLOW-UP] Add sample.json back.

2018-11-04 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22942
  
cc @srowen @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT, and us...

2018-11-04 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22892
  
Seems like we still need `sample.json` in hive module. I'll submit a 
follow-up pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22913: [SPARK-25902][SQL] Add support for dates with mil...

2018-11-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22913#discussion_r230628333
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala 
---
@@ -71,6 +71,7 @@ object ArrowUtils {
 case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
 case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType
 case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND => 
TimestampType
+case date: ArrowType.Date if date.getUnit == DateUnit.MILLISECOND => 
TimestampType
--- End diff --

Notice that Spark doesn't have date type with milliseconds, so if we want 
to map to date type, the hours, minutes, ... will be lost. Otherwise we have to 
map to timestamp type.
Which is the proper behavior? cc @BryanCutler 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22928: [SPARK-25926][CORE] Move config entries in core module t...

2018-11-02 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22928
  
cc @jiangxb1987 @cloud-fan @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22928: [SPARK-25926][CORE] Move config entries in core m...

2018-11-02 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/22928

[SPARK-25926][CORE] Move config entries in core module to internal.config.

## What changes were proposed in this pull request?

Currently definitions of config entries in `core` module are in several 
files separately. We should move them into `internal/config` to be easy to 
manage.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-25926/single_config_file

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22928.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22928


commit e1dfa2dc2ecf9423e5e5eeb9825d45c59188afa5
Author: Takuya UESHIN 
Date:   2018-11-01T04:28:30Z

Move configs for history server to internal.config.

commit 65371025b04376ecf72e379beabb885cbb252b88
Author: Takuya UESHIN 
Date:   2018-11-01T04:37:51Z

Move configs for status to internal.config.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT,...

2018-10-31 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22892#discussion_r229737749
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{AnalysisException, ShowCreateTableSuite}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class HiveShowCreateTableSuite extends ShowCreateTableSuite with 
TestHiveSingleton {
+
+  test("simple hive table") {
+withTable("t1") {
+  sql(
+s"""CREATE TABLE t1 (
+   |  c1 INT COMMENT 'bla',
+   |  c2 STRING
+   |)
+   |TBLPROPERTIES (
+   |  'prop1' = 'value1',
+   |  'prop2' = 'value2'
+   |)
+ """.stripMargin
+  )
+
+  checkCreateTable("t1")
+}
+  }
+
+  test("simple external hive table") {
+withTempDir { dir =>
+  withTable("t1") {
+sql(
+  s"""CREATE TABLE t1 (
+ |  c1 INT COMMENT 'bla',
+ |  c2 STRING
+ |)
+ |LOCATION '${dir.toURI}'
+ |TBLPROPERTIES (
+ |  'prop1' = 'value1',
+ |  'prop2' = 'value2'
+ |)
+   """.stripMargin
+)
+
+checkCreateTable("t1")
+  }
+}
+  }
+
+  test("partitioned hive table") {
--- End diff --

Yes, we have tests `"partitioned data source table"`, `"bucketed data 
source table"`, and `"partitioned bucketed data source table"` in 
`ShowCreateTableSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT,...

2018-10-30 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22892#discussion_r229278709
  
--- Diff: sql/core/src/test/resources/sample.json ---
@@ -0,0 +1,2 @@
+{"a" : "2" ,"b" : "blah", "c_!@(3)":1}
--- End diff --

It is needed to run a test "data source table with user specified schema". 
Actually we just needed to move the file. I'll remove the extra one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT,...

2018-10-30 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/22892

[SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT, and use LOCATION when 
SHOW CREATE TABLE.

## What changes were proposed in this pull request?

When `SHOW CREATE TABLE` for Datasource tables, we are missing 
`TBLPROPERTIES` and `COMMENT`, and we should use `LOCATION` instead of path in 
`OPTION`.

## How was this patch tested?

Splitted `ShowCreateTableSuite` to confirm to work with both 
`InMemoryCatalog` and `HiveExternalCatalog`, and  added some tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-25884/show_create_table

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22892.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22892


commit 2ca37061c37edb400c5779009c86abfb492032ac
Author: Takuya UESHIN 
Date:   2018-10-30T06:33:46Z

Split ShowCreateTableSuite.

commit c2dce69370415f64c4ed802e5bdc241f1c868596
Author: Takuya UESHIN 
Date:   2018-10-30T09:55:06Z

Add TBLPROPERTIES and COMMENT, and use LOCATION.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT, and us...

2018-10-30 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22892
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22816: [SPARK-25822][PySpark]Fix a race condition when releasin...

2018-10-25 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22816
  
Thanks! merging to master/2.4/2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22816: [SPARK-25822][PySpark]Fix a race condition when releasin...

2018-10-25 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22816
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-19 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r226624218
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -207,6 +207,14 @@ class SessionCatalog(
   "you cannot create a database with this name.")
 }
 validateName(dbName)
+// SPARK-25464 fail if DB location exists and is not empty
+val dbPath = new Path(dbDefinition.locationUri)
+val fs = dbPath.getFileSystem(hadoopConf)
+if (!externalCatalog.databaseExists(dbName) && fs.exists(dbPath)
+  && fs.listStatus(dbPath).nonEmpty) {
--- End diff --

Oh, btw, I noticed that @gatorsmile is asking for doing the file existence 
check in the external catalog, not in the session catalog. Could you move this 
there?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r226536626
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -840,12 +840,19 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
   }
 
   test("create table in default db") {
-val catalog = spark.sessionState.catalog
-val tableIdent1 = TableIdentifier("tab1", None)
-createTable(catalog, tableIdent1)
-val expectedTableIdent = tableIdent1.copy(database = Some("default"))
-val expectedTable = generateTable(catalog, expectedTableIdent)
-checkCatalogTables(expectedTable, 
catalog.getTableMetadata(tableIdent1))
+var tablePath: Option[URI] = None
+try {
+  val catalog = spark.sessionState.catalog
+  val tableIdent1 = TableIdentifier("tab1", None)
+  createTable(catalog, tableIdent1)
+  val expectedTableIdent = tableIdent1.copy(database = Some("default"))
+  val expectedTable = generateTable(catalog, expectedTableIdent)
+  tablePath = Some(expectedTable.location)
+  checkCatalogTables(expectedTable, 
catalog.getTableMetadata(tableIdent1))
+} finally {
+  // This is external table,so it is required to deleted the path
--- End diff --

@HyukjinKwon The first one is `e,s` -> `e, s` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22762: [SPARK-25763][SQL][PYSPARK][TEST] Use more `@cont...

2018-10-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22762#discussion_r226266575
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -225,6 +225,55 @@ def sql_conf(self, pairs):
 else:
 self.spark.conf.set(key, old_value)
 
+@contextmanager
+def database(self, *databases):
+"""
+A convenient context manager to test with some specific databases. 
This drops the given
+databases if exist and sets current database to "default" when it 
exits.
+"""
+assert hasattr(self, "spark"), "it should have 'spark' attribute, 
having a spark session."
+
+if len(databases) == 1 and isinstance(databases[0], (list, set)):
--- End diff --

Sure, I'll remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22466: [SPARK-25464][SQL] Create Database to the location,only ...

2018-10-18 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22466
  
Btw, what if `create database if not exists ...`? Seems like an exception 
will be thrown if the table exists even if we specify `if not exists`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22762: [SPARK-25763][SQL][PYSPARK][TEST] Use more `@contextmana...

2018-10-18 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22762
  
cc @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22762: [SPARK-25763][SQL][PYSPARK][TEST] Use more `@contextmana...

2018-10-18 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22762
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r226245229
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2993,6 +2990,7 @@ def test_current_database(self):
 AnalysisException,
 "does_not_exist",
 lambda: spark.catalog.setCurrentDatabase("does_not_exist"))
+spark.sql("DROP DATABASE some_db")
--- End diff --

Submitted #22762.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22762: [SPARK-25763][SQL][PYSPARK][TEST] Use more `@cont...

2018-10-18 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/22762

[SPARK-25763][SQL][PYSPARK][TEST] Use more `@contextmanager` to ensure 
clean-up each test.

## What changes were proposed in this pull request?

Currently each test in `SQLTest` in PySpark is not cleaned properly.
We should introduce and use more `@contextmanager` to be convenient to 
clean up the context properly.

## How was this patch tested?

Modified tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-25763/cleanup_sqltests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22762.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22762


commit 9f7901b887f8fcd8e03a81e09548bd3525f3072e
Author: Takuya UESHIN 
Date:   2018-10-18T09:24:46Z

Use more `@contextmanager` to ensure clean-up each test.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r226236226
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2993,6 +2990,7 @@ def test_current_database(self):
 AnalysisException,
 "does_not_exist",
 lambda: spark.catalog.setCurrentDatabase("does_not_exist"))
+spark.sql("DROP DATABASE some_db")
--- End diff --

We still need to cleanup each test. Otherwise, if the test fails before 
dropping `some_db`, it will cause to fail other tests.
Seems like the current cleanup is not good. Let me refine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in Pyspark

2018-10-17 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/21990
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in P...

2018-10-17 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21990#discussion_r225811960
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -1136,4 +1121,27 @@ object SparkSession extends Logging {
   SparkSession.clearDefaultSession()
 }
   }
+
+  /**
+   * Initialize extensions if the user has defined a configurator class in 
their SparkConf.
+   * This class will be applied to the extensions passed into this 
function.
+   */
+  private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: 
SparkSessionExtensions) {
--- End diff --

I see, but in that case, we need to ensure that no injection of extensions 
is used in the default constructor to avoid initializing without extensions 
from the conf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-17 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r225807075
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1115,9 +1126,38 @@ object SQLContext {
 })
 }
 }
-def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-  case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
-  case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
+def createConverter(t: Type, dataType: DataType): Any => Any = (t, 
dataType) match {
+  case (cls: Class[_], struct: StructType) =>
--- End diff --

Reusing `JavaTypeInference.serializerFor` would be great, but currently it 
behaves a little differently. At least it doesn't support 
`java.lang.Iterable[_]`, so we can't use it immediately. We need to extend it 
to support `Iterable` (and also `deserializerFor`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-17 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r225807045
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1115,9 +1126,38 @@ object SQLContext {
 })
 }
 }
-def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-  case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
-  case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
+def createConverter(t: Type, dataType: DataType): Any => Any = (t, 
dataType) match {
--- End diff --

I'm okay to move this to `CatalystTypeConverters `, but note that 
unfortunately seems like `CatalystTypeConverters` doesn't work properly with 
nested beans as we are trying to support it here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-17 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r225806457
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1115,9 +1126,38 @@ object SQLContext {
 })
 }
 }
-def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-  case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
-  case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
+def createConverter(t: Type, dataType: DataType): Any => Any = (t, 
dataType) match {
+  case (cls: Class[_], struct: StructType) =>
+// bean type
+createStructConverter(cls, struct.map(_.dataType))
+  case (arrayType: Class[_], array: ArrayType) if arrayType.isArray =>
+// array type
+val converter = createConverter(arrayType.getComponentType, 
array.elementType)
+value => new GenericArrayData(
+  (0 until JavaArray.getLength(value)).map(i =>
+converter(JavaArray.get(value, i))).toArray)
+  case (_, array: ArrayType) =>
+// java.util.List type
+val cls = classOf[java.util.List[_]]
--- End diff --

On second thoughts, we should use `java.lang.Iterable` here. We can convert 
`Iterable` to `ArrayType` as `ArrayConverter` is trying. If we use 
`java.util.List` here, it leads behavior changes for list of primitives.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in P...

2018-10-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21990#discussion_r225762148
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -1136,4 +1121,27 @@ object SparkSession extends Logging {
   SparkSession.clearDefaultSession()
 }
   }
+
+  /**
+   * Initialize extensions if the user has defined a configurator class in 
their SparkConf.
+   * This class will be applied to the extensions passed into this 
function.
+   */
+  private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: 
SparkSessionExtensions) {
--- End diff --

Oh, I see, moving to the default constructor was not a good idea.
How about the first suggestion?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in P...

2018-10-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21990#discussion_r225439067
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -1136,4 +1121,27 @@ object SparkSession extends Logging {
   SparkSession.clearDefaultSession()
 }
   }
+
+  /**
+   * Initialize extensions if the user has defined a configurator class in 
their SparkConf.
+   * This class will be applied to the extensions passed into this 
function.
+   */
+  private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: 
SparkSessionExtensions) {
--- End diff --

On second thoughts, we could move the method call to the top of the default 
constructor?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225389273
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,48 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(value: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case st: StructType =>
+v.isInstanceOf[InternalRow] && {
+  val row = v.asInstanceOf[InternalRow]
+  st.fields.map(_.dataType).zipWithIndex.forall {
+case (dt, i) => doValidate(row.get(i, dt), dt)
+  }
+}
+  case at: ArrayType =>
+v.isInstanceOf[GenericArrayData] && {
+  val ar = v.asInstanceOf[GenericArrayData].array
+  ar.isEmpty || doValidate(ar.head, at.elementType)
+}
+  case mt: MapType =>
+v.isInstanceOf[ArrayBasedMapData] && {
+  val map = v.asInstanceOf[ArrayBasedMapData]
+  map.numElements() == 0 || {
+doValidate(map.keyArray.array.head, mt.keyType) &&
+  doValidate(map.valueArray.array.head, mt.valueType)
+  }
+}
+  case ObjectType(cls) => cls.isInstance(v)
+  case udt: UserDefinedType[_] => doValidate(v, udt.sqlType)
+  case _ => false
--- End diff --

We need to add `NullType` case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225387436
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,48 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(value: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case st: StructType =>
+v.isInstanceOf[InternalRow] && {
+  val row = v.asInstanceOf[InternalRow]
+  st.fields.map(_.dataType).zipWithIndex.forall {
+case (dt, i) => doValidate(row.get(i, dt), dt)
+  }
+}
+  case at: ArrayType =>
+v.isInstanceOf[GenericArrayData] && {
+  val ar = v.asInstanceOf[GenericArrayData].array
+  ar.isEmpty || doValidate(ar.head, at.elementType)
+}
+  case mt: MapType =>
+v.isInstanceOf[ArrayBasedMapData] && {
+  val map = v.asInstanceOf[ArrayBasedMapData]
+  map.numElements() == 0 || {
+doValidate(map.keyArray.array.head, mt.keyType) &&
+  doValidate(map.valueArray.array.head, mt.valueType)
+  }
+}
--- End diff --

I'm wondering whether we don't need to check the whole elements for 
`ArrayType` and `MapType`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225194050
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,31 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(v: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case _: StructType => v.isInstanceOf[InternalRow]
+  case _: ArrayType => v.isInstanceOf[ArrayData]
+  case _: MapType => v.isInstanceOf[MapData]
--- End diff --

Should validate recursively for `StructType`, `ArrayType`, and `MapType`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21990: [SPARK-25003][PYSPARK] Use SessionExtensions in P...

2018-10-15 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21990#discussion_r225077270
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -1136,4 +1121,27 @@ object SparkSession extends Logging {
   SparkSession.clearDefaultSession()
 }
   }
+
+  /**
+   * Initialize extensions if the user has defined a configurator class in 
their SparkConf.
+   * This class will be applied to the extensions passed into this 
function.
+   */
+  private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: 
SparkSessionExtensions) {
--- End diff --

How about returning `SparkSessionExtensions` from this method, and 
modifying the secondary constructor of `SparkSession` as:

```scala
private[sql] def this(sc: SparkContext) {
  this(sc, None, None,
SparkSession.applyExtensionsFromConf(sc.getConf, new 
SparkSessionExtensions))
}
```

I'm a little worried whether the order we apply extensions might affect.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r224671775
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1115,9 +1126,38 @@ object SQLContext {
 })
 }
 }
-def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-  case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
-  case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
+def createConverter(t: Type, dataType: DataType): Any => Any = (t, 
dataType) match {
+  case (cls: Class[_], struct: StructType) =>
+// bean type
+createStructConverter(cls, struct.map(_.dataType))
+  case (arrayType: Class[_], array: ArrayType) if arrayType.isArray =>
+// array type
+val converter = createConverter(arrayType.getComponentType, 
array.elementType)
+value => new GenericArrayData(
+  (0 until JavaArray.getLength(value)).map(i =>
+converter(JavaArray.get(value, i))).toArray)
+  case (_, array: ArrayType) =>
+// java.util.List type
+val cls = classOf[java.util.List[_]]
--- End diff --

Seems like `JavaTypeInference.inferDataType()` supports 
`java.lang.Iterable`, not only `List`, but serializer/deserializer don't. 
Should we change `inferDataType()`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r224667371
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2993,6 +2990,7 @@ def test_current_database(self):
 AnalysisException,
 "does_not_exist",
 lambda: spark.catalog.setCurrentDatabase("does_not_exist"))
+spark.sql("DROP DATABASE some_db")
--- End diff --

We should surround with try-finally?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r224666263
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -350,9 +350,6 @@ def test_sqlcontext_reuses_sparksession(self):
 def tearDown(self):
--- End diff --

Now we can remove this method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...

2018-10-10 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22309#discussion_r224318955
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -108,6 +108,16 @@ object TestingUDT {
   }
 }
 
+object TestingValueClass {
+  case class IntWrapper(i: Int) extends AnyVal
+  case class StrWrapper(s: String) extends AnyVal
+
+  case class ValueClassData(
+intField: Int,
+wrappedInt: IntWrapper,
+strField: String,
+wrappedStr: StrWrapper)
--- End diff --

We might need a comment to describe what this class is look like in Java.
Seems like it has 2 int fields `intField`, `wrappedInt`, and 2 string 
fields `strField`, `wrappedStr`. I'm not sure it is the same in Scala 2.12, 
though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22545: [SPARK-25525][SQL][PYSPARK] Do not update conf fo...

2018-10-10 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22545#discussion_r223953269
  
--- Diff: python/pyspark/sql/session.py ---
@@ -156,7 +156,7 @@ def getOrCreate(self):
 default.
 
 >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
->>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") 
== "v1"
+>>> s1.conf.get("k1") == "v1"
--- End diff --

Submitted a pr to update the migration guide #22682.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22682: [SPARK-20946][SPARK-25525][SQL][FOLLOW-UP] Update...

2018-10-10 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/22682

[SPARK-20946][SPARK-25525][SQL][FOLLOW-UP] Update the migration guide.

## What changes were proposed in this pull request?

This is a follow-up pr of #18536 and #22545 to update the migration guide.

## How was this patch tested?

Build and check the doc locally.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-20946_25525/migration_guide

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22682.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22682


commit 3bf42af5695729597e1f49b11f50f2fb450ead7f
Author: Takuya UESHIN 
Date:   2018-10-10T06:29:20Z

Update the migration guide.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22682: [SPARK-20946][SPARK-25525][SQL][FOLLOW-UP] Update the mi...

2018-10-10 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22682
  
cc @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...

2018-10-05 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22527
  
Seems like there is a merge commit in apache git 
https://git-wip-us.apache.org/repos/asf?p=spark.git, but not in GitHub yet.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...

2018-10-05 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22527
  
Sorry, the merge script failed. Let me try again a while later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...

2018-10-05 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/22527
  
I'd merge this not to block the following prs to support array/list and map 
of beans.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >