[GitHub] spark pull request #23141: [SPARK-26021][SQL][followup] add test for special...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23141#discussion_r239333919
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java ---
@@ -165,10 +165,14 @@ public void writeMinusZeroIsReplacedWithZero() {
 byte[] floatBytes = new byte[Float.BYTES];
 Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d);
 Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f);
-double doubleFromPlatform = Platform.getDouble(doubleBytes, 
Platform.BYTE_ARRAY_OFFSET);
-float floatFromPlatform = Platform.getFloat(floatBytes, 
Platform.BYTE_ARRAY_OFFSET);
 
-Assert.assertEquals(Double.doubleToLongBits(0.0d), 
Double.doubleToLongBits(doubleFromPlatform));
-Assert.assertEquals(Float.floatToIntBits(0.0f), 
Float.floatToIntBits(floatFromPlatform));
+byte[] doubleBytes2 = new byte[Double.BYTES];
+byte[] floatBytes2 = new byte[Float.BYTES];
+Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, 0.0d);
+Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, 0.0f);
--- End diff --

ah good catch! I'm surprised this test passed before...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23239
  
Yes, the 3 cases I pointed that need to handle NaN and -0.0 do not change 
the value in `UnsafeRow`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23239
  
cc @adoron @kiszk @viirya @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23239: [SPARK-26021][SQL][followup] only deal with NaN a...

2018-12-05 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23239

[SPARK-26021][SQL][followup] only deal with NaN and -0.0 in UnsafeWriter

## What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/23043

There are 4 places we need to deal with NaN and -0.0:
1. Range partitioner(the sorter). `-0.0` and `0.0` should be assigned to 
the same partition. Different NaNs should be assigned to the same partition.
2. Join keys. `-0.0` and `0.0` should be treated as same. Different NaNs 
should be treated as same.
3. grouping keys. `-0.0` and `0.0` should be assigned to the same group. 
Different NaNs should be assigned to the same group.
4. comparison expressions. `-0.0` and `0.0` should be treated as same. 
Different NaNs should be treated as same.

The case 4 is OK. Our comparison already handles NaN and -0.0, and for 
struct/array/map, we will recursively compare the fields/elements.

Case 1, 2 and 3 are problematic, as they compare `UnsafeRow` binary 
directly, and different NaNs have different binary representation, and the same 
thing happens for -0.0 and 0.0.

To fix it, a simple solution is: let `UnsafeProjection` always produce 
`UnsafeRow`s with NaN and -0.0 normalized(use the standard NaN and replace -0.0 
with 0.0). The `UnsafeRow`s in case 1, 2 and 3 are all created by 
`UnsafeProjection`.

Following this direction, this PR moves the handling of NaN and -0.0 from 
`Platform` to `UnsafeWriter`, so that places like `UnsafeRow.setFloat` will not 
handle them, which reduces the perf overhead. It's also easier to add comments 
explaining why we do it in `UnsafeWriter`.

## How was this patch tested?

existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23239


commit 797ade3eb175c41866efbffa3cb4c30f90e49ca7
Author: Wenchen Fan 
Date:   2018-12-05T15:05:39Z

only deal with NaN and -0.0 in UnsafeWriter




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23235: [SPARK-26151][SQL][FOLLOWUP] Return partial results for ...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23235
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23227: [SPARK-26271][FOLLOW-UP][SQL] remove unuse object SparkP...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23227
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239090244
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
+  extends ShuffleWriteMetricsReporter with Serializable {
+  @transient private[this] lazy val _bytesWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
+  @transient private[this] lazy val _recordsWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
+  @transient private[this] lazy val _writeTime =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
+
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+metricsReporter.incBytesWritten(v)
+_bytesWritten.add(v)
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_recordsWritten.set(_recordsWritten.value - v)
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+metricsReporter.incRecordsWritten(v)
+_recordsWritten.add(v)
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+metricsReporter.incWriteTime(v)
+_writeTime.add(v)
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_bytesWritten.set(_bytesWritten.value - v)
+  }
+}
+
+private[spark] object SQLShuffleWriteMetricsReporter {
+  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
+  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
+  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
--- End diff --

cc @rxin , do you think we should change this metric to use ms as well? In 
all the places that read/write it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23213
  
If we look at test coverage, `wholeStage=false, factoryMode=CODE_ONLY` will 
go through code paths that wholeStageCodegen doesn't cover. Or did I miss 
something? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23213
  
But whole stage codegen will not test `GenerateUnsafeProject`, 
`GenerateMutableProject`, etc., right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23222: [SPARK-20636] Add the rule TransposeWindow to the optimi...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23222
  
Jenkins passes, which means the previously added end-to-end test can't not 
show the benefit of this rule. We should update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239060606
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
+  extends ShuffleWriteMetricsReporter with Serializable {
+  @transient private[this] lazy val _bytesWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
+  @transient private[this] lazy val _recordsWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
+  @transient private[this] lazy val _writeTime =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
+
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+metricsReporter.incBytesWritten(v)
+_bytesWritten.add(v)
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_recordsWritten.set(_recordsWritten.value - v)
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+metricsReporter.incRecordsWritten(v)
+_recordsWritten.add(v)
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+metricsReporter.incWriteTime(v)
+_writeTime.add(v)
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_bytesWritten.set(_bytesWritten.value - v)
+  }
+}
+
+private[spark] object SQLShuffleWriteMetricsReporter {
+  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
+  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
+  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
--- End diff --

do we have other time metrics using nanoseconds?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239059162
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -163,6 +171,8 @@ object SQLMetrics {
 Utils.bytesToString
   } else if (metricsType == TIMING_METRIC) {
 Utils.msDurationToString
+  } else if (metricsType == NS_TIMING_METRIC) {
+duration => Utils.msDurationToString(duration / 1000 / 1000)
--- End diff --

will this string lose the nanosecond precision?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23213
  
how about `wholeStage=false, factoryMode=CODE_ONLY`? I think it's different 
from `wholeStage=false, factoryMode=NO_CODEGEN`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23213
  
that's a lot of time...

Can we think more about the combination of codegen and wholeStage? When we 
turn on whole stage codegen but turn off codegen, what will happen?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23222: [SPARK-20636] Add the rule TransposeWindow to the optimi...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23222
  
That PR also added an end-to-end test, does this mean that test is not 
valid?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17899: [SPARK-20636] Add new optimization rule to transp...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17899#discussion_r238950241
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -734,6 +734,28 @@ object CollapseWindow extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Transpose Adjacent Window Expressions.
--- End diff --

why is this rule useful?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23213
  
do you know how long `SQLQueryTestSuite` takes? We are making it longer by 
4 times here, so better to know the overhead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r238949362
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -181,62 +180,39 @@ case class RelationConversions(
 conf: SQLConf,
 sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
   private def isConvertible(relation: HiveTableRelation): Boolean = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-serde.contains("parquet") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
-  serde.contains("orc") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
+isConvertible(relation.tableMeta)
   }
 
-  // Return true for Apache ORC and Hive ORC-related configuration names.
-  // Note that Spark doesn't support configurations like 
`hive.merge.orcfile.stripe.level`.
-  private def isOrcProperty(key: String) =
-key.startsWith("orc.") || key.contains(".orc.")
-
-  private def isParquetProperty(key: String) =
-key.startsWith("parquet.") || key.contains(".parquet.")
-
-  private def convert(relation: HiveTableRelation): LogicalRelation = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-
-// Consider table and storage properties. For properties existing in 
both sides, storage
-// properties will supersede table properties.
-if (serde.contains("parquet")) {
-  val options = 
relation.tableMeta.properties.filterKeys(isParquetProperty) ++
-relation.tableMeta.storage.properties + 
(ParquetOptions.MERGE_SCHEMA ->
-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
-  sessionCatalog.metastoreCatalog
-.convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet")
-} else {
-  val options = 
relation.tableMeta.properties.filterKeys(isOrcProperty) ++
-relation.tableMeta.storage.properties
-  if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
-  "orc")
-  } else {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
-  "orc")
-  }
-}
+  private def isConvertible(tableMeta: CatalogTable): Boolean = {
+val serde = 
tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+serde.contains("parquet") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
+  serde.contains("orc") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
+  private val metastoreCatalog = sessionCatalog.metastoreCatalog
+
   override def apply(plan: LogicalPlan): LogicalPlan = {
 plan resolveOperators {
   // Write path
   case InsertIntoTable(r: HiveTableRelation, partition, query, 
overwrite, ifPartitionNotExists)
 // Inserting into partitioned table is not supported in 
Parquet/Orc data source (yet).
   if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
 !r.isPartitioned && isConvertible(r) =>
-InsertIntoTable(convert(r), partition, query, overwrite, 
ifPartitionNotExists)
+InsertIntoTable(metastoreCatalog.convert(r), partition,
+  query, overwrite, ifPartitionNotExists)
 
   // Read path
   case relation: HiveTableRelation
   if DDLUtils.isHiveTable(relation.tableMeta) && 
isConvertible(relation) =>
-convert(relation)
+metastoreCatalog.convert(relation)
+
+  // CTAS
+  case CreateTable(tableDesc, mode, Some(query))
+  if DDLUtils.isHiveTable(tableDesc) && 
tableDesc.partitionColumnNames.isEmpty &&
+isConvertible(tableDesc) =>
--- End diff --

We usually don't write a migration guide for perf optimizations. Otherwise 
it's annoying to write one for each optimization and ask users to turn it off 
if something goes wrong. I think we only do that when there are known issues.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23120: [SPARK-26151][SQL] Return partial results for bad CSV re...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23120
  
Hi @MaxGekk , since this changes the result(although makes it better), do 
you mind adding a migration guide? thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238909822
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
   private[spark] def decBytesWritten(v: Long): Unit
   private[spark] def decRecordsWritten(v: Long): Unit
 }
+
+
+/**
+ * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleWriteMetricsReporter(
--- End diff --

For the write metrics, it's different. It's the default one calls the SQL 
one, which needs to hack the default one to register external reporters.

Maybe we should not change the read side, just create a special 
`PairShuffleWriteMetricsReporter` to update both the SQL reporter and default 
reporter.

Another idea is, `ShuffleDependency` carries a `reporter => reporter` 
function, instead of a reporter. Then we can create a SQL reporter which takes 
another reporter(similar to read side), and put the SQL reporter's constructor 
in `ShuffleDependency`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r238908877
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -181,62 +180,39 @@ case class RelationConversions(
 conf: SQLConf,
 sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
   private def isConvertible(relation: HiveTableRelation): Boolean = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-serde.contains("parquet") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
-  serde.contains("orc") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
+isConvertible(relation.tableMeta)
   }
 
-  // Return true for Apache ORC and Hive ORC-related configuration names.
-  // Note that Spark doesn't support configurations like 
`hive.merge.orcfile.stripe.level`.
-  private def isOrcProperty(key: String) =
-key.startsWith("orc.") || key.contains(".orc.")
-
-  private def isParquetProperty(key: String) =
-key.startsWith("parquet.") || key.contains(".parquet.")
-
-  private def convert(relation: HiveTableRelation): LogicalRelation = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-
-// Consider table and storage properties. For properties existing in 
both sides, storage
-// properties will supersede table properties.
-if (serde.contains("parquet")) {
-  val options = 
relation.tableMeta.properties.filterKeys(isParquetProperty) ++
-relation.tableMeta.storage.properties + 
(ParquetOptions.MERGE_SCHEMA ->
-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
-  sessionCatalog.metastoreCatalog
-.convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet")
-} else {
-  val options = 
relation.tableMeta.properties.filterKeys(isOrcProperty) ++
-relation.tableMeta.storage.properties
-  if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
-  "orc")
-  } else {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
-  "orc")
-  }
-}
+  private def isConvertible(tableMeta: CatalogTable): Boolean = {
+val serde = 
tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+serde.contains("parquet") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
+  serde.contains("orc") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
+  private val metastoreCatalog = sessionCatalog.metastoreCatalog
+
   override def apply(plan: LogicalPlan): LogicalPlan = {
 plan resolveOperators {
   // Write path
   case InsertIntoTable(r: HiveTableRelation, partition, query, 
overwrite, ifPartitionNotExists)
 // Inserting into partitioned table is not supported in 
Parquet/Orc data source (yet).
   if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
 !r.isPartitioned && isConvertible(r) =>
-InsertIntoTable(convert(r), partition, query, overwrite, 
ifPartitionNotExists)
+InsertIntoTable(metastoreCatalog.convert(r), partition,
+  query, overwrite, ifPartitionNotExists)
 
   // Read path
   case relation: HiveTableRelation
   if DDLUtils.isHiveTable(relation.tableMeta) && 
isConvertible(relation) =>
-convert(relation)
+metastoreCatalog.convert(relation)
+
+  // CTAS
+  case CreateTable(tableDesc, mode, Some(query))
+  if DDLUtils.isHiveTable(tableDesc) && 
tableDesc.partitionColumnNames.isEmpty &&
+isConvertible(tableDesc) =>
--- End diff --

I don't mind to add `HiveUtils.CONVERT_METASTORE_ORC_CTAS`, maybe we can do 
it in a followup?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23210: [SPARK-26233][SQL] CheckOverflow when encoding a decimal...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23210
  
a late LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23217
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r238899698
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -181,62 +180,39 @@ case class RelationConversions(
 conf: SQLConf,
 sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
   private def isConvertible(relation: HiveTableRelation): Boolean = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-serde.contains("parquet") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
-  serde.contains("orc") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
+isConvertible(relation.tableMeta)
   }
 
-  // Return true for Apache ORC and Hive ORC-related configuration names.
-  // Note that Spark doesn't support configurations like 
`hive.merge.orcfile.stripe.level`.
-  private def isOrcProperty(key: String) =
-key.startsWith("orc.") || key.contains(".orc.")
-
-  private def isParquetProperty(key: String) =
-key.startsWith("parquet.") || key.contains(".parquet.")
-
-  private def convert(relation: HiveTableRelation): LogicalRelation = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-
-// Consider table and storage properties. For properties existing in 
both sides, storage
-// properties will supersede table properties.
-if (serde.contains("parquet")) {
-  val options = 
relation.tableMeta.properties.filterKeys(isParquetProperty) ++
-relation.tableMeta.storage.properties + 
(ParquetOptions.MERGE_SCHEMA ->
-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
-  sessionCatalog.metastoreCatalog
-.convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet")
-} else {
-  val options = 
relation.tableMeta.properties.filterKeys(isOrcProperty) ++
-relation.tableMeta.storage.properties
-  if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
-  "orc")
-  } else {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
-  "orc")
-  }
-}
+  private def isConvertible(tableMeta: CatalogTable): Boolean = {
+val serde = 
tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+serde.contains("parquet") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
+  serde.contains("orc") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
+  private val metastoreCatalog = sessionCatalog.metastoreCatalog
+
   override def apply(plan: LogicalPlan): LogicalPlan = {
 plan resolveOperators {
   // Write path
   case InsertIntoTable(r: HiveTableRelation, partition, query, 
overwrite, ifPartitionNotExists)
 // Inserting into partitioned table is not supported in 
Parquet/Orc data source (yet).
   if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
 !r.isPartitioned && isConvertible(r) =>
-InsertIntoTable(convert(r), partition, query, overwrite, 
ifPartitionNotExists)
+InsertIntoTable(metastoreCatalog.convert(r), partition,
+  query, overwrite, ifPartitionNotExists)
 
   // Read path
   case relation: HiveTableRelation
   if DDLUtils.isHiveTable(relation.tableMeta) && 
isConvertible(relation) =>
-convert(relation)
+metastoreCatalog.convert(relation)
+
+  // CTAS
+  case CreateTable(tableDesc, mode, Some(query))
+  if DDLUtils.isHiveTable(tableDesc) && 
tableDesc.partitionColumnNames.isEmpty &&
+isConvertible(tableDesc) =>
--- End diff --

It's not a new optimization... It's an optimization we dropped in 2.3 by 
mistake.

I'm fine to add a config with default value true.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r238706120
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,127 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  // Returns `DataWritingCommand` used to write data when the table exists.
+  def writingCommandForExistingTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
+  // Returns `DataWritingCommand` used to write data when the table 
doesn't exist.
+  def writingCommandForNewTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+InsertIntoHiveTable(
+  tableDesc,
+  Map.empty,
+  query,
+  overwrite = false,
+  ifPartitionNotExists = false,
+  outputColumnNames = outputColumnNames)
+  }
+
+  override def writingCommandForNewTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+// For CTAS, there is no static partition values to insert.
+val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
+InsertIntoHiveTable(
+  tableDesc,
+  partition,
+  query,
+  overwrite = true,
+  ifPartitionNotExists = false,
+  outputColumnNames = outputColumnNames)
+  }
+}
+
+/**
+ * Create table and insert the query result into it. This creates Hive 
table but inserts
+ * the query result into it by using data source.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class OptimizedCreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  private def getHadoopRelation(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): HadoopFsRelation = {
+val metastoreCatalog = 
catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+val hiveTable = DDLUtils.readHiveTable(tableDesc)
+
+metastoreCatalog.convert(hiveTable) match {
+  case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
+  case _ => throw new AnalysisException(s"$tableIdentifier should be 
converted to " +
+"HadoopFsRelation.")
+}
+  }
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+val hadoopRelation = getHadoopRelation(catalog, tableDesc)
+InsertIntoHadoopFsRelationCommand(
+  hadoopRelation.location.rootPaths.head,
+  Map.empty, // We don't support to convert partitioned table.
+  false,
+  Seq.empty, // We don't support to convert partitioned table.
+  hadoopRelation.bucketSpec,
+  hadoopRelation.fileFormat,
+  hadoopRelation.options,
+  query,
+  mode,
+  Some(tableDesc),
+  Some(hadoopRelation.location),
+  query.output.map(_.name))
+  }
+
+  override def writingCommandForNewTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+val hadoopRelation = getHadoopRelation(catalog, tableDesc)
+InsertIntoHadoopFsRelationCommand(
+  hadoopRelation.location.rootPaths.head,
+  Map.empty, // We don't support to convert partitioned table.
+  false,
+  Seq.empty, // We don't support to convert partitioned table.
+  hadoopRelation.bucketSpec,
+  hadoopRelation.fileFormat,
+  hadoopRelation.options,
+ 

[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23217#discussion_r238698103
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -47,13 +48,17 @@ class ArrayBasedMapBuilder(keyType: DataType, 
valueType: DataType) extends Seria
   private lazy val keyGetter = InternalRow.getAccessor(keyType)
   private lazy val valueGetter = InternalRow.getAccessor(valueType)
 
-  def put(key: Any, value: Any): Unit = {
+  def put(key: Any, value: Any, withSizeCheck: Boolean = false): Unit = {
 if (key == null) {
   throw new RuntimeException("Cannot use null as map key.")
 }
 
 val index = keyToIndex.getOrDefault(key, -1)
 if (index == -1) {
+  if (withSizeCheck && size >= 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
--- End diff --

hmmm, I'd like to avoid premature optimization. Actually how much perf this 
can save? This code block is already doing some heavy work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23217#discussion_r238651534
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -47,13 +48,17 @@ class ArrayBasedMapBuilder(keyType: DataType, 
valueType: DataType) extends Seria
   private lazy val keyGetter = InternalRow.getAccessor(keyType)
   private lazy val valueGetter = InternalRow.getAccessor(valueType)
 
-  def put(key: Any, value: Any): Unit = {
+  def put(key: Any, value: Any, withSizeCheck: Boolean = false): Unit = {
 if (key == null) {
   throw new RuntimeException("Cannot use null as map key.")
 }
 
 val index = keyToIndex.getOrDefault(key, -1)
 if (index == -1) {
+  if (withSizeCheck && size >= 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+throw new RuntimeException(s"Unsuccessful attempt to concat maps 
with $size elements " +
--- End diff --

nit: `concat` -> `build`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23217#discussion_r238651421
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -47,13 +48,17 @@ class ArrayBasedMapBuilder(keyType: DataType, 
valueType: DataType) extends Seria
   private lazy val keyGetter = InternalRow.getAccessor(keyType)
   private lazy val valueGetter = InternalRow.getAccessor(valueType)
 
-  def put(key: Any, value: Any): Unit = {
+  def put(key: Any, value: Any, withSizeCheck: Boolean = false): Unit = {
 if (key == null) {
   throw new RuntimeException("Cannot use null as map key.")
 }
 
 val index = keyToIndex.getOrDefault(key, -1)
 if (index == -1) {
+  if (withSizeCheck && size >= 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
--- End diff --

I think we should aways check the size. Such a big map is very likely to 
cause problems.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23217
  
thanks for the cleanup!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r238650207
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 assert(requiredChildDistributions.length == children.length)
 assert(requiredChildOrderings.length == children.length)
 
+val aliasMap = 
AttributeMap[Expression](children.flatMap(_.expressions.collect {
+  case a: Alias => (a.toAttribute, a)
+}))
+
 // Ensure that the operator's children satisfy their output 
distribution requirements.
 children = children.zip(requiredChildDistributions).map {
-  case (child, distribution) if 
child.outputPartitioning.satisfies(distribution) =>
+  case (child, distribution) if child.outputPartitioning.satisfies(
+  distribution.mapExpressions(replaceAlias(_, aliasMap))) =>
--- End diff --

seems we are not on the same page...

Let's make the example clearer. Assuming a `relation[a ,b]`'s partitioning 
is `hash(a, b)`, then `Project(a as c, a as d, b, relation)`'s partitioning 
should be `[hash(c, b), hash(d, b)]`. It's like a flatMap.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22468: [SPARK-25374][SQL] SafeProjection supports fallback to a...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22468
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHOLESTAGE...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23213
  
We should create such a framework when we need to have per-file config 
settings for testing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r238634730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 assert(requiredChildDistributions.length == children.length)
 assert(requiredChildOrderings.length == children.length)
 
+val aliasMap = 
AttributeMap[Expression](children.flatMap(_.expressions.collect {
+  case a: Alias => (a.toAttribute, a)
+}))
+
 // Ensure that the operator's children satisfy their output 
distribution requirements.
 children = children.zip(requiredChildDistributions).map {
-  case (child, distribution) if 
child.outputPartitioning.satisfies(distribution) =>
+  case (child, distribution) if child.outputPartitioning.satisfies(
+  distribution.mapExpressions(replaceAlias(_, aliasMap))) =>
--- End diff --

For `Project(a as c, a as d, b, relation)`, I think the 
`outputPartitioning` should be `[hash part c, hash part d, hash part b]`. The 
point is, we should not report an output partitioning whose attribute is not 
even in the current plan's output.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238633725
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
   threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
 } else 0L
 
+// Register the shuffle write metrics reporter to shuffleWriteMetrics.
+if (dep.shuffleWriteMetricsReporter.isDefined) {
+  
context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
--- End diff --

a simpler idea:
1. create a `class GroupedShuffleWriteMetricsReporter(reporters: 
Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter`, which 
proxy all the metrics updating to the input reporters.
2. create a `GroupedShuffleWriteMetricsReporter` instance here: `new 
GroupedShuffleWriteMetricsReporter(Seq(dep.shuffleWriteMetricsReporter.get, 
context.taskMetrics().shuffleWriteMetrics))`, and pass it to `manager.getWriter`

I think we can use the same approach for read metrics as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238630996
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
   threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
 } else 0L
 
+// Register the shuffle write metrics reporter to shuffleWriteMetrics.
+if (dep.shuffleWriteMetricsReporter.isDefined) {
+  
context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
--- End diff --

This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can 
be option instead of array.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238630981
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
   threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
 } else 0L
 
+// Register the shuffle write metrics reporter to shuffleWriteMetrics.
+if (dep.shuffleWriteMetricsReporter.isDefined) {
+  
context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
--- End diff --

This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can 
be option instead of array.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23215: [SPARK-26263][SQL] Throw exception when Partition column...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23215
  
I think this new behavior makes more sense, but we need to add a migration 
guide.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23213#discussion_r238627633
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2899,6 +2899,144 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 }
   }
+
+  private def checkKeywordsExistsInExplain(df: DataFrame, keywords: 
String*): Unit = {
+val output = new java.io.ByteArrayOutputStream()
+Console.withOut(output) {
+  df.explain(extended = true)
+}
+val normalizedOutput = output.toString.replaceAll("#\\d+", "#x")
+for (key <- keywords) {
+  assert(normalizedOutput.contains(key))
+}
+  }
+
+  test("optimized plan should show the rewritten aggregate expression") {
--- End diff --

all the explain related tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r238626367
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 assert(requiredChildDistributions.length == children.length)
 assert(requiredChildOrderings.length == children.length)
 
+val aliasMap = 
AttributeMap[Expression](children.flatMap(_.expressions.collect {
+  case a: Alias => (a.toAttribute, a)
+}))
+
 // Ensure that the operator's children satisfy their output 
distribution requirements.
 children = children.zip(requiredChildDistributions).map {
-  case (child, distribution) if 
child.outputPartitioning.satisfies(distribution) =>
+  case (child, distribution) if child.outputPartitioning.satisfies(
+  distribution.mapExpressions(replaceAlias(_, aliasMap))) =>
--- End diff --

for example, `relation[a, b]`'s output partitioning is `[hash partition a, 
hash partition b]`, and `Project(a as c, b, relation)`'s output partitioning 
should be `[hash partition c, hash partition b]`.

What do you mean by `But ProjectExec.outputPartitioning cannot contain a 
reference to the aliases`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23213#discussion_r238625581
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -144,9 +144,10 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
 val (comments, code) = input.split("\n").partition(_.startsWith("--"))
 
 // Runs all the tests on both codegen-only and interpreter modes
-val codegenConfigSets = Array(CODEGEN_ONLY, NO_CODEGEN).map {
-  case codegenFactoryMode =>
-Array(SQLConf.CODEGEN_FACTORY_MODE.key -> 
codegenFactoryMode.toString)
+val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", 
"CODEGEN_ONLY")).map {
--- End diff --

shall we test all the combinations? e.g. `wholeStage=on, codegen=off`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23213#discussion_r238625268
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2899,6 +2899,144 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 }
   }
+
+  private def checkKeywordsExistsInExplain(df: DataFrame, keywords: 
String*): Unit = {
+val output = new java.io.ByteArrayOutputStream()
+Console.withOut(output) {
+  df.explain(extended = true)
+}
+val normalizedOutput = output.toString.replaceAll("#\\d+", "#x")
+for (key <- keywords) {
+  assert(normalizedOutput.contains(key))
+}
+  }
+
+  test("optimized plan should show the rewritten aggregate expression") {
--- End diff --

can we move them to `ExplainSuite`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23214
  
I think there is a problem, but no one found out because it's only about 
metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23207
  
Can you share some ideas about it? IMO shuffle write metrics is hard, as an 
RDD can have shuffle dependencies with multiple upstream RDDs. That said, in 
general the shuffle write metrics should belong to the upstream RDDs.

In Spark SQL, it's a little simpler, as the `ShuffledRowRDD` always have 
only one child, so it's reasonable to say that shuffle write metrics belong to 
`ShuffledRowRDD`.

That said, we need to design a not-so-general shuffle write metrics API in 
Spark core, which will only be used in Spark SQL.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23214
  
It's easy to track `numKeyLookups` at `HashedRelation`, but it's hard to 
track `numProbes`. One idea is, we pass a `MutableInt` to 
`LongToUnsafeRowMap.getValue` as a parameter, and in the method we set the 
actual `numProbes` of this look up to the `MutableInt` parameter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23214
  
I might know the root cause: `LongToUnsafeRowMap` is acutally accessed by 
multiple threads.

For broadcast hash join, we will copy the broadcasted hash relation to 
avoid multi-thread problem, via `HashedRelation.asReadOnlyCopy`. However, this 
is a shallow copy, the `LongToUnsafeRowMap` is not copied and likely shared by 
multiple `HashedRelation`s.

The metrics is per-task, so I think a better fix is to track the hash probe 
metrics per `HashedRelation`, instead of per `LongToUnsafeRowMap`. It's too 
costly to copy the `LongToUnsafeRowMap`, we should think about how to do it 
efficiently. cc @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23214: [SPARK-26155] Optimizing the performance of LongT...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23214#discussion_r238549645
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 ---
@@ -398,8 +399,8 @@ private[execution] final class LongToUnsafeRowMap(val 
mm: TaskMemoryManager, cap
   private var numKeys = 0L
 
   // Tracking average number of probes per key lookup.
-  private var numKeyLookups = 0L
-  private var numProbes = 0L
+  private var numKeyLookups = new LongAdder
+  private var numProbes = new LongAdder
--- End diff --

I'm surprised. I think `LongToUnsafeRowMap` is used in a single thread 
environment and multi-thread contend should not be an issue here. Do you have 
any insights about how this fixes the perf issue?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23214
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23171
  
How about, we create an `OptimizedIn`, and convert `In` to `OptimizedIn` if 
the list is all literals? `OptimizedIn` will pick `switch` or hash set based on 
the length of the list.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238534101
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+val mapResultRow = convertBackToInternalRow(mapRow, 
fields4).toSeq(fields4)
+val mapExpectedRow = mapRow.toSeq(fields4)
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
--- End diff --

Or we can use `ExpressionEvalHelper.checkResult` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238533700
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+val mapResultRow = convertBackToInternalRow(mapRow, 
fields4).toSeq(fields4)
+val mapExpectedRow = mapRow.toSeq(fields4)
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
--- End diff --

maybe we should implement `equals` and `hashCode` in `ArrayBasedMapData` 
and `UnsafeMapData`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23171
  
I think `InSet` is not an optimized version of `In`, but just a way to 
separate the implementation for different conditions (the length of the list). 
Maybe we should do the same thing here, create a `InSwitch` and convert `In` to 
it when meeting some conditions. One problem is, `In` and `InSwitch` is same in 
the interpreted version, maybe we should create a base class for them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23171
  
@rxin I proposed the same thing before, but one problem is that, we only 
convert `In` to `InSet` when the length of list reaches the threshold. If the 
`switch` way is faster than hash set when the list is small, it seems still 
worth to optimize `In` using `switch`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23212: [SPARK-25498][SQL][FOLLOW-UP] Return an empty config set...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23212
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r238524973
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
--- End diff --

I don't have a better naming in mind, so I leave it as `WriteSupport` for 
now. Better naming is welcome to match `Scan`!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r238524763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 assert(requiredChildDistributions.length == children.length)
 assert(requiredChildOrderings.length == children.length)
 
+val aliasMap = 
AttributeMap[Expression](children.flatMap(_.expressions.collect {
+  case a: Alias => (a.toAttribute, a)
+}))
+
 // Ensure that the operator's children satisfy their output 
distribution requirements.
 children = children.zip(requiredChildDistributions).map {
-  case (child, distribution) if 
child.outputPartitioning.satisfies(distribution) =>
+  case (child, distribution) if child.outputPartitioning.satisfies(
+  distribution.mapExpressions(replaceAlias(_, aliasMap))) =>
--- End diff --

As an example, `ProjectExec.outputPartitioning` can be wrong, as it doesn't 
consider the aliases in the project list. I think it's clearer to adjust the 
`outputPartitioning` there, instead of dealing with it in a rule. What if we 
have more rules need to check `outputPartitioning` and 
`requiredChildDistribution`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238520267
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,100 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
+// we need to take care of it to compare rows.
+def toComparable(d: Any): Any = d match {
--- End diff --

this does nothing, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238515544
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
 ---
@@ -106,4 +106,19 @@ class CodeGeneratorWithInterpretedFallbackSuite 
extends SparkFunSuite with PlanT
   assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === 
expected)
 }
   }
+
+  test("SPARK-25374 Correctly handles NoOp in SafeProjection") {
+val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), 
Literal.create(1)), NoOp)
+val input = InternalRow.fromSeq(1 :: 1 :: Nil)
+val expected = 2 :: null :: Nil
+withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
--- End diff --

nvm, this is the code style in this test suite


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238515444
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
 ---
@@ -106,4 +106,19 @@ class CodeGeneratorWithInterpretedFallbackSuite 
extends SparkFunSuite with PlanT
   assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === 
expected)
 }
   }
+
+  test("SPARK-25374 Correctly handles NoOp in SafeProjection") {
+val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), 
Literal.create(1)), NoOp)
+val input = InternalRow.fromSeq(1 :: 1 :: Nil)
+val expected = 2 :: null :: Nil
+withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
--- End diff --

can we use `testWithBothCodegenAndIntepreted`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238515227
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 ---
@@ -166,29 +166,40 @@ object UnsafeProjection
   }
 }
 
-/**
- * A projection that could turn UnsafeRow into GenericInternalRow
--- End diff --

can we keep this comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23212: [SPARK-25498][SQL][FOLLOW-UP] Return an empty config set...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23212
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238330712
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -157,4 +157,22 @@ object InternalRow {
   getValueNullSafe
 }
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
--- End diff --

We can rebase now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22512: [SPARK-25498][SQL] InterpretedMutableProjection should h...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22512
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238328405
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -148,12 +156,25 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   })
   // When we are regenerating the golden files we don't need to run 
all the configs as they
   // all need to return the same result
-  if (regenerateGoldenFiles && configs.nonEmpty) {
-configs.take(1)
+  if (regenerateGoldenFiles) {
+if (configs.nonEmpty) {
+  configs.take(1)
+} else {
+  Array.empty[Array[(String, String)]]
--- End diff --

nit: since configs don't matter when generating result, I think we can just 
return empty configs here. We can clean it up in a followup PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23152
  
thanks, merging to master/2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23204#discussion_r238323331
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 ---
@@ -483,8 +470,6 @@ private[execution] final class LongToUnsafeRowMap(val 
mm: TaskMemoryManager, cap
*/
   def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
 if (isDense) {
-  numKeyLookups += 1
-  numProbes += 1
--- End diff --

+1, like I said in 
https://github.com/apache/spark/pull/23204/files#r238257371


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23204#discussion_r238322699
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 ---
@@ -483,8 +470,6 @@ private[execution] final class LongToUnsafeRowMap(val 
mm: TaskMemoryManager, cap
*/
   def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
 if (isDense) {
-  numKeyLookups += 1
-  numProbes += 1
--- End diff --

If this is proved to cause perf regression, I think it's safer to revert 
`HashAggregateExec` as well, since they are doing the same thing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r238321460
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 assert(requiredChildDistributions.length == children.length)
 assert(requiredChildOrderings.length == children.length)
 
+val aliasMap = 
AttributeMap[Expression](children.flatMap(_.expressions.collect {
+  case a: Alias => (a.toAttribute, a)
+}))
+
 // Ensure that the operator's children satisfy their output 
distribution requirements.
 children = children.zip(requiredChildDistributions).map {
-  case (child, distribution) if 
child.outputPartitioning.satisfies(distribution) =>
+  case (child, distribution) if child.outputPartitioning.satisfies(
+  distribution.mapExpressions(replaceAlias(_, aliasMap))) =>
--- End diff --

instead of doing it here, shall we deal with alias in 
`SparkPlan.outputPartitioning` for operators with a project list?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r238318256
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 assert(requiredChildDistributions.length == children.length)
 assert(requiredChildOrderings.length == children.length)
 
+val aliasMap = 
AttributeMap[Expression](children.flatMap(_.expressions.collect {
--- End diff --

is it safe to do so? I think we should only collect aliases from operators 
with a project list, i.e. `Project`, `Aggregate`, `Window`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23208
  
cc @rdblue @rxin @jose-torres  @gatorsmile @HyukjinKwon @gengliangwang 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r238313454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
--- End diff --

I'll do it in my next PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r238313221
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 assertNotBucketed("save")
 
-val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-  source match {
-case provider: BatchWriteSupportProvider =>
-  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-source,
-df.sparkSession.sessionState.conf)
-  val options = sessionOptions ++ extraOptions
-
+val session = df.sparkSession
+val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
+if (classOf[TableProvider].isAssignableFrom(cls)) {
+  val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+provider, session.sessionState.conf)
+  val options = sessionOptions ++ extraOptions
+  val dsOptions = new DataSourceOptions(options.asJava)
+  provider.getTable(dsOptions) match {
+case table: SupportsBatchWrite =>
+  val relation = DataSourceV2Relation.create(table, dsOptions)
+  // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
+  // We should create new end-users APIs for the `AppendData` 
operator.
--- End diff --

according to the discussion in 
https://github.com/apache/spark/pull/22688#issuecomment-428626027 , the 
behavior of append operator and `SaveMode.Append` can be different. We should 
revisit it when we have the new end-user write APIs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-03 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23208

[SPARK-25530][SQL] data source v2 API refactor (batch write)

## What changes were proposed in this pull request?

Adjust the batch write API to match the read API refactor after 
https://github.com/apache/spark/pull/23086

Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, 
and make it extend `Table`. It also cleans up some code as batch API is 
completed.

This PR also removes the test from 
https://github.com/apache/spark/pull/22688 . Now data source must return a 
table for read/write. It's a little awkward to use it with the `SaveMode` based 
write APIs, as users can append data to a non-existing table. `TableProvider` 
needs to return a `Table` instance with empty schema if the table doesn't 
exist, so that we can write it later. Hopefully we can remove the `SaveMode` 
based write APIs after the new APIs are finished and widely used.

A few notes about future changes:
1. We will create `SupportsStreamingWrite` later for streaming APIs
2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. 
for the new end-user write APIs. I think streaming APIs would remain to use 
`OutputMode`, and new end-user write APIs will apply to batch only, at least in 
the near future.


## How was this patch tested?

existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark refactor-batch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23208


commit 00fc34fa793b922a48a4bf8e9f9cd0e3b688800b
Author: Wenchen Fan 
Date:   2018-12-03T14:38:43Z

data source v2 API refactor (batch write)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23204#discussion_r238264122
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -57,12 +57,6 @@ class SQLMetric(val metricType: String, initValue: Long 
= 0L) extends Accumulato
 
   override def add(v: Long): Unit = _value += v
 
-  // We can set a double value to `SQLMetric` which stores only long 
value, if it is
--- End diff --

I think we can keep the changes in this file as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23204
  
I'm fine to revert it if it caused a significant performance regression, we 
should revisit it later, with different ideas, like updating the metrics for 
each batch instead of each record.

cc @viirya @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23204#discussion_r23825
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 ---
@@ -374,22 +374,6 @@ class TungstenAggregationIterator(
 }
   }
 
-  TaskContext.get().addTaskCompletionListener[Unit](_ => {
--- End diff --

ditto, it's better to put this code block here, let's keep this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23204#discussion_r238257371
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -63,7 +63,7 @@ case class HashAggregateExec(
 "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory"),
 "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
 "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate 
time"),
-"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg 
hash probe"))
+"avgHashmapProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg 
hashmap probe"))
--- End diff --

I know it's easy to just run the `git revert` command, but I'd like to 
manually revert it, since that PR was merged long time ago. And we should still 
keep changes like this renaming, as they are not quite related to the 
performance regression.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23204
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, validate...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23186
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23203: [SPARK-26252][PYTHON] Add support to run specific unitte...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23203
  
I used to run pyspark test via `python python/pyspark/sql/dataframe.py`, 
after setting `export PYTHONPATH="$(find "${SPARK_HOME}"/python/lib/ -name 
'py4j-*-src.zip' -type f | uniq)":"${SPARK_HOME}"/python`.

I'm happy to see an easier way to do it, though I'm not very familiar with 
these scrpts. Thanks for doing it!



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23120: [SPARK-26151][SQL] Return partial results for bad CSV re...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23120
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23010: [SPARK-26012][SQL]Null and '' values should not cause dy...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23010
  
The root cause is, `DynamicPartitionDataWriter` treats null and empty 
string as different partition values, and creates new files. However, null and 
empty string are converted to `__HIVE_DEFAULT_PARTITION__` at the end.

I think we should deal with invalid partition values ahead, so that we 
don't need to worry about them during writing. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23190: [SPARK-26117][FOLLOW-UP][SQL]throw SparkOutOfMemoryError...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23190
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238172470
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -148,12 +156,21 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   })
   // When we are regenerating the golden files we don't need to run 
all the configs as they
   // all need to return the same result
-  if (regenerateGoldenFiles && configs.nonEmpty) {
+  if (regenerateGoldenFiles) {
 configs.take(1)
--- End diff --

what if `configs` is empty? `take(1)` will fail


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, validate...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23186
  
retest this pleae


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23120: [SPARK-26151][SQL] Return partial results for bad CSV re...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23120
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23120: [SPARK-26151][SQL] Return partial results for bad...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23120#discussion_r238083349
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ---
@@ -243,21 +243,27 @@ class UnivocityParser(
 () => getPartialResult(),
 new RuntimeException("Malformed CSV record"))
 } else {
-  try {
-// When the length of the returned tokens is identical to the 
length of the parsed schema,
-// we just need to convert the tokens that correspond to the 
required columns.
-var i = 0
-while (i < requiredSchema.length) {
+  // When the length of the returned tokens is identical to the length 
of the parsed schema,
+  // we just need to convert the tokens that correspond to the 
required columns.
+  var badRecordException: Option[Throwable] = None
+  var i = 0
+  while (i < requiredSchema.length) {
+try {
   row(i) = valueConverters(i).apply(getToken(tokens, i))
-  i += 1
+} catch {
+  case NonFatal(e) =>
+badRecordException = badRecordException.orElse(Some(e))
 }
+i += 1
+  }
+
+  if (badRecordException.isEmpty) {
 row
-  } catch {
-case NonFatal(e) =>
-  // For corrupted records with the number of tokens same as the 
schema,
-  // CSV reader doesn't support partial results. All fields other 
than the field
-  // configured by `columnNameOfCorruptRecord` are set to `null`.
-  throw BadRecordException(() => getCurrentInput, () => None, e)
+  } else {
+// For corrupted records with the number of tokens same as the 
schema,
+// CSV reader doesn't support partial results. All fields other 
than the field
+// configured by `columnNameOfCorruptRecord` are set to `null`.
--- End diff --

what do you mean here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23154: [SPARK-26195][SQL] Correct exception messages in some cl...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23154
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23178: [SPARK-26216][SQL] Do not use case class as publi...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23178#discussion_r238083055
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
 ---
@@ -38,114 +38,106 @@ import org.apache.spark.sql.types.DataType
  * @since 1.3.0
  */
 @Stable
--- End diff --

It's not a new API anyway, it will be weird to change since to 3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23178: [SPARK-26216][SQL] Do not use case class as public API (...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23178
  
thanks for the review, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23130
  
We don't need to block it, but @MaxGekk if you have time, it would great to 
answer https://github.com/apache/spark/pull/23130#issuecomment-442491582

thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23190: [MINOR][SQL]throw SparkOutOfMemoryError intead of SparkE...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23190
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23190: [MINOR][SQL]throw SparkOutOfMemoryError intead of SparkE...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23190
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23187: [SPARK-26211][SQL][TEST][FOLLOW-UP] Combine test cases f...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23187
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...

2018-12-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23153#discussion_r238082589
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -155,19 +155,20 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
 }
 
 /**
- * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
- * and pull them out from join condition. For python udf accessing 
attributes from only one side,
- * they are pushed down by operation push down rules. If not (e.g. user 
disables filter push
- * down rules), we need to pull them out in this rule too.
+ * PythonUDF in join condition can't be evaluated if it refers to 
attributes from both join sides.
+ * See `ExtractPythonUDFs` for details. This rule will detect un-evaluable 
PythonUDF and pull them
+ * out from join condition.
  */
 object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
-  def hasPythonUDF(expression: Expression): Boolean = {
-expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+
+  private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean 
= {
+expr.find { e =>
+  PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && 
!canEvaluate(e, j.right)
--- End diff --

It's only possible to have scalar UDF in join condition, so changing it to 
`e.isInstanceOf[PythonUDF]` is same.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...

2018-11-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23176#discussion_r237770687
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
 ---
@@ -293,6 +293,54 @@ class PredicateSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 }
   }
 
+  test("INSET: binary") {
--- End diff --

good idea! we should test `In` and `InSet` together


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-11-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r237756348
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,98 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  def getDataWritingCommand(
--- End diff --

I feel it's better to have 2 methods: `writingCommandForExistingTable`, 
`writingCommandForNewTable`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-11-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r237756394
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,98 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  def getDataWritingCommand(
+catalog: SessionCatalog,
+tableDesc: CatalogTable,
+tableExists: Boolean): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the Table Describe, which may contain serde, storage 
handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def getDataWritingCommand(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable,
+  tableExists: Boolean): DataWritingCommand = {
+if (tableExists) {
+  InsertIntoHiveTable(
+tableDesc,
+Map.empty,
+query,
+overwrite = false,
+ifPartitionNotExists = false,
+outputColumnNames = outputColumnNames)
+} else {
+  // For CTAS, there is no static partition values to insert.
+  val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
+  InsertIntoHiveTable(
+tableDesc,
+partition,
+query,
+overwrite = true,
+ifPartitionNotExists = false,
+outputColumnNames = outputColumnNames)
+}
+  }
+}
+
+/**
+ * Create table and insert the query result into it. This creates Hive 
table but inserts
+ * the query result into it by using data source.
+ *
+ * @param tableDesc the Table Describe, which may contain serde, storage 
handler etc.
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-11-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r237753623
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,98 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  def getDataWritingCommand(
+catalog: SessionCatalog,
+tableDesc: CatalogTable,
+tableExists: Boolean): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the Table Describe, which may contain serde, storage 
handler etc.
--- End diff --

`table description`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-11-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r237753433
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,98 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  def getDataWritingCommand(
+catalog: SessionCatalog,
+tableDesc: CatalogTable,
+tableExists: Boolean): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the Table Describe, which may contain serde, storage 
handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def getDataWritingCommand(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable,
+  tableExists: Boolean): DataWritingCommand = {
+if (tableExists) {
+  InsertIntoHiveTable(
+tableDesc,
+Map.empty,
+query,
+overwrite = false,
+ifPartitionNotExists = false,
+outputColumnNames = outputColumnNames)
+} else {
+  // For CTAS, there is no static partition values to insert.
+  val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
+  InsertIntoHiveTable(
+tableDesc,
+partition,
+query,
+overwrite = true,
+ifPartitionNotExists = false,
+outputColumnNames = outputColumnNames)
+}
+  }
+}
+
+/**
+ * Create table and insert the query result into it. This creates Hive 
table but inserts
+ * the query result into it by using data source.
+ *
+ * @param tableDesc the Table Describe, which may contain serde, storage 
handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectWithDataSourceCommand(
--- End diff --

`OptimizedCreateHiveTableAsSelectCommand`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...

2018-11-29 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22957
  
LGTM, cc @viirya as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >