[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16481


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-12 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r95905076
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -413,10 +413,85 @@ case class DataSource(
 relation
   }
 
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
-  def write(
-  mode: SaveMode,
-  data: DataFrame): BaseRelation = {
+  /**
+   * Writes the given [[DataFrame]] out in this [[FileFormat]].
+   */
+  private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
DataFrame): Unit = {
+// Don't glob path for the write path.  The contracts here are:
+//  1. Only one output path can be specified on the write path;
+//  2. Output path must be a legal HDFS style file system path;
+//  3. It's OK that the output path doesn't exist yet;
+val allPaths = paths ++ caseInsensitiveOptions.get("path")
+val outputPath = if (allPaths.length == 1) {
+  val path = new Path(allPaths.head)
+  val fs = 
path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+  path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+} else {
+  throw new IllegalArgumentException("Expected exactly one path to be 
specified, but " +
+s"got: ${allPaths.mkString(", ")}")
+}
+
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+PartitioningUtils.validatePartitionColumn(data.schema, 
partitionColumns, caseSensitive)
+
+// If we are appending to a table that already exists, make sure the 
partitioning matches
+// up.  If we fail to load the table for whatever reason, ignore the 
check.
+if (mode == SaveMode.Append) {
+  val existingPartitionColumns = Try {
+getOrInferFileFormatSchema(format, justPartitioning = 
true)._2.fieldNames.toList
+  }.getOrElse(Seq.empty[String])
+  // TODO: Case sensitivity.
+  val sameColumns =
+existingPartitionColumns.map(_.toLowerCase()) == 
partitionColumns.map(_.toLowerCase())
+  if (existingPartitionColumns.nonEmpty && !sameColumns) {
+throw new AnalysisException(
+  s"""Requested partitioning does not match existing partitioning.
+ |Existing partitioning columns:
+ |  ${existingPartitionColumns.mkString(", ")}
+ |Requested partitioning columns:
+ |  ${partitionColumns.mkString(", ")}
+ |""".stripMargin)
+  }
+}
+
+// SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
+// not need to have the query as child, to avoid to analyze an 
optimized query,
+// because InsertIntoHadoopFsRelationCommand will be optimized first.
+val partitionAttributes = partitionColumns.map { name =>
+  val plan = data.logicalPlan
+  plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${plan.output.map(_.name).mkString(", ")}]")
+  }.asInstanceOf[Attribute]
+}
+val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
+  sparkSession.table(tableIdent).queryExecution.analyzed.collect {
+case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
+  }.head
+}
+// For partitioned relation r, r.schema's column ordering can be 
different from the column
+// ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
+// will be adjusted within InsertIntoHadoopFsRelation.
+val plan =
+  InsertIntoHadoopFsRelationCommand(
+outputPath = outputPath,
+staticPartitions = Map.empty,
+partitionColumns = partitionAttributes,
+bucketSpec = bucketSpec,
+fileFormat = format,
+options = options,
+query = data.logicalPlan,
+mode = mode,
+catalogTable = catalogTable,
+fileIndex = fileIndex)
+  sparkSession.sessionState.executePlan(plan).toRdd
--- End diff --

To the reviewers, the part in `writeInFileFormat` has no code change. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r95724792
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -413,17 +413,22 @@ case class DataSource(
 relation
   }
 
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]].
+   *
+   * @param isForWriteOnly Whether to just write the data without 
returning a [[BaseRelation]].
+   */
   def write(
--- End diff --

Sure. Will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r95715805
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -413,17 +413,22 @@ case class DataSource(
 relation
   }
 
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]].
+   *
+   * @param isForWriteOnly Whether to just write the data without 
returning a [[BaseRelation]].
+   */
   def write(
--- End diff --

let's create a new `write` method that returns `Unit`, and rename this 
`write` to `writeAndRead`, which should be removed eventually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-11 Thread cenyuhai
Github user cenyuhai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r95538748
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -413,17 +413,22 @@ case class DataSource(
 relation
   }
 
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]].
+   *
+   * @param isForWriteOnly Whether to just write the data without 
returning a [[BaseRelation]].
+   */
   def write(
   mode: SaveMode,
-  data: DataFrame): BaseRelation = {
+  data: DataFrame,
+  isForWriteOnly: Boolean = false): Option[BaseRelation] = {
 if 
(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
   throw new AnalysisException("Cannot save interval data type into 
external storage.")
 }
 
 providingClass.newInstance() match {
   case dataSource: CreatableRelationProvider =>
-dataSource.createRelation(sparkSession.sqlContext, mode, 
caseInsensitiveOptions, data)
+Some(dataSource.createRelation(sparkSession.sqlContext, mode, 
caseInsensitiveOptions, data))
--- End diff --

maybe we can set a parameter here, let user to choose true or false


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r95299609
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -413,17 +413,22 @@ case class DataSource(
 relation
   }
 
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
+  /**
+   * Writes the given [[DataFrame]] out to this [[DataSource]].
+   *
+   * @param isForWriteOnly Whether to just write the data without 
returning a [[BaseRelation]].
+   */
   def write(
   mode: SaveMode,
-  data: DataFrame): BaseRelation = {
+  data: DataFrame,
+  isForWriteOnly: Boolean = false): Option[BaseRelation] = {
 if 
(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
   throw new AnalysisException("Cannot save interval data type into 
external storage.")
 }
 
 providingClass.newInstance() match {
   case dataSource: CreatableRelationProvider =>
-dataSource.createRelation(sparkSession.sqlContext, mode, 
caseInsensitiveOptions, data)
+Some(dataSource.createRelation(sparkSession.sqlContext, mode, 
caseInsensitiveOptions, data))
--- End diff --

it would be really weird if `CreatableRelationProvider.createRelation` can 
return a relation with different schema from the written `data`. Is it safe to 
assume the schema won't change? cc @marmbrus @yhuai @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r95070345
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -494,8 +500,13 @@ case class DataSource(
 catalogTable = catalogTable,
 fileIndex = fileIndex)
 sparkSession.sessionState.executePlan(plan).toRdd
-// Replace the schema with that of the DataFrame we just wrote out 
to avoid re-inferring it.
-copy(userSpecifiedSchema = 
Some(data.schema.asNullable)).resolveRelation()
+if (isForWriteOnly) {
+  // Exit earlier and return null
--- End diff --

Sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-07 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r95065596
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -494,8 +500,13 @@ case class DataSource(
 catalogTable = catalogTable,
 fileIndex = fileIndex)
 sparkSession.sessionState.executePlan(plan).toRdd
-// Replace the schema with that of the DataFrame we just wrote out 
to avoid re-inferring it.
-copy(userSpecifiedSchema = 
Some(data.schema.asNullable)).resolveRelation()
+if (isForWriteOnly) {
+  // Exit earlier and return null
--- End diff --

I'd remove "and return null"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-05 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r94886105
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 ---
@@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
   }
 
   private def setupPartitionedDatasourceTable(
-  tableName: String, dir: File, scale: Int,
-  clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): 
Unit = {
+  tableName: String, dir: File, scale: Int, repair: Boolean = true): 
Unit = {
 spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id 
as partCol2").write
   .partitionBy("partCol1", "partCol2")
   .mode("overwrite")
   .parquet(dir.getAbsolutePath)
 
-if (clearMetricsBeforeCreate) {
--- End diff --

Nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-05 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r94886317
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -494,8 +500,13 @@ case class DataSource(
 catalogTable = catalogTable,
 fileIndex = fileIndex)
 sparkSession.sessionState.executePlan(plan).toRdd
-// Replace the schema with that of the DataFrame we just wrote out 
to avoid re-inferring it.
-copy(userSpecifiedSchema = 
Some(data.schema.asNullable)).resolveRelation()
+if (isForWriteOnly) {
+  // Exit earlier and return null
+  null
--- End diff --

Maybe we can change it to return an option?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r94879343
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -494,8 +500,13 @@ case class DataSource(
 catalogTable = catalogTable,
 fileIndex = fileIndex)
 sparkSession.sessionState.executePlan(plan).toRdd
-// Replace the schema with that of the DataFrame we just wrote out 
to avoid re-inferring it.
-copy(userSpecifiedSchema = 
Some(data.schema.asNullable)).resolveRelation()
+if (isForWriteOnly) {
+  // Exit earlier and return null
+  null
--- End diff --

I do not know whether returning null is ok here. This is based on a 
[similar early-exit 
solution](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L178)
 used in `getOrInferFileFormatSchema`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16481#discussion_r94879140
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 ---
@@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
   })
   executorPool.shutdown()
   executorPool.awaitTermination(30, TimeUnit.SECONDS)
-  // check the cache hit, we use the metric of 
METRIC_FILES_DISCOVERED and
-  // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the 
lock take effect,
-  // only one thread can really do the build, so the listing job 
count is 2, the other
-  // one is cache.load func. Also METRIC_FILES_DISCOVERED is 
$partition_num * 2
--- End diff --

This comment is not accurate. The extra counts are from the save API call 
in `setupPartitionedHiveTable`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16481: [SPARK-19092] [SQL] Save() API of DataFrameWriter...

2017-01-05 Thread gatorsmile
GitHub user gatorsmile opened a pull request:

https://github.com/apache/spark/pull/16481

[SPARK-19092] [SQL] Save() API of DataFrameWriter should not scan all the 
saved files

### What changes were proposed in this pull request?
`DataFrameWriter`'s [save() 
API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207)
 is performing a unnecessary full filesystem scan for the saved files. The 
save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. 

The related PR: https://github.com/apache/spark/pull/16090

### How was this patch tested?
Updated the existing test cases.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gatorsmile/spark saveFileScan

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16481


commit 5d38f09f47a767a342a0a8219c63efa2943b5d1f
Author: gatorsmile 
Date:   2017-01-05T23:53:55Z

fix.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org