[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-09 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r240101369
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
   extends LeafNode with MultiInstanceRelation with NamedRelation {
 
-  import DataSourceV2Relation._
-
   override def name: String = table.name()
 
   override def simpleString: String = {
 s"RelationV2${truncatedString(output, "[", ", ", "]")} $name"
   }
 
-  def newWriteSupport(): BatchWriteSupport = 
source.createWriteSupport(options, schema)
-
-  def newScanBuilder(): ScanBuilder = {
-val dsOptions = new DataSourceOptions(options.asJava)
-table.newScanBuilder(dsOptions)
+  def newWriteSupport(inputSchema: StructType, mode: SaveMode): 
Optional[BatchWriteSupport] = {
--- End diff --

Nit: add comment for the method. Especially when it will return None. 
Although it is explained in `SupportsBatchWrite.createBatchWriteSupport`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r240028574
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
--- End diff --

It was done it multiple places before:

https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L62

https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L153

https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L171

If you prefer it strongly, I can follow it and update the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r240028515
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

I'm not convinced it's safe to remove `SaveMode` right away, when there is 
only an `Append` operator implemented currently.

If we do it, it means `DataFrameWriter.save` need to throw an exception for 
a lot of cases, except when the `mode` is append. I don't think this is 
acceptable right now.

Can we discuss the removal of `SaveMode` at least after all the necessary 
new write operators are implemented?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239889152
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
--- End diff --

A private method to do that existed in the past. Why not just revive it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239888975
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

I'm fine either way, as long as we are consistent between the read and 
write sides.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239888795
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

I think we can remove SaveMode right away. We don't need to break existing 
use cases if we add the OverwriteData plan and use it when the user's mode is 
Overwrite. That helps us get to the point where we can integrate SQL on top of 
this faster.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239684490
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

I do think read-only or write-only is a necessary feature, according to 
what I've seen in the dev list. Maybe we should move `newScanBuilder` from 
`Table` to the mixin traits.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239683592
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
--- End diff --

Because this makes the code cleaner, otherwise I need to write more code to 
convert a map to `DataSourceOptions` multiple times inside 
`DataSourceV2Relation`.

I don't have a strong preference here, and just pick the easiest approach 
for me. If you do think using a map here is clearer, I can add these extra code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239682984
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 assertNotBucketed("save")
 
-val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-  source match {
-case provider: BatchWriteSupportProvider =>
-  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-source,
-df.sparkSession.sessionState.conf)
-  val options = sessionOptions ++ extraOptions
-
+val session = df.sparkSession
+val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
+if (classOf[TableProvider].isAssignableFrom(cls)) {
+  val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+provider, session.sessionState.conf)
+  val options = sessionOptions ++ extraOptions
+  val dsOptions = new DataSourceOptions(options.asJava)
+  provider.getTable(dsOptions) match {
+case table: SupportsBatchWrite =>
+  val relation = DataSourceV2Relation.create(table, dsOptions)
+  // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
+  // We should create new end-users APIs for the `AppendData` 
operator.
--- End diff --

yea, that's why I only left a comment and just ask for revisiting later. I 
think we can see a clearer picture after we migrating the file source.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239682239
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

I don't want to break existing use cases, file sources can overwrite/append 
to a non-existing location, and we still need to support that with `SaveMode`.

Whatever the new write API will be, I think we still need to support 
`SaveMode` for a while.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239613722
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

`Table` exposes `newScanBuilder` without an interface. Why should the write 
side be different? Doesn't Spark support sources that are read-only and 
write-only?

I think that both reads and writes should use interfaces to mix support 
into `Table` or both should be exposed by `Table` and throw 
`UnsupportedOperationException` by default, not a mix of the two options.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239613088
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
--- End diff --

Why change this now, when DataSourceOptions will be replaced? I would say 
just leave it as a map and update it once later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239598346
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 assertNotBucketed("save")
 
-val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-  source match {
-case provider: BatchWriteSupportProvider =>
-  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-source,
-df.sparkSession.sessionState.conf)
-  val options = sessionOptions ++ extraOptions
-
+val session = df.sparkSession
+val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
+if (classOf[TableProvider].isAssignableFrom(cls)) {
+  val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+provider, session.sessionState.conf)
+  val options = sessionOptions ++ extraOptions
+  val dsOptions = new DataSourceOptions(options.asJava)
+  provider.getTable(dsOptions) match {
+case table: SupportsBatchWrite =>
+  val relation = DataSourceV2Relation.create(table, dsOptions)
+  // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
+  // We should create new end-users APIs for the `AppendData` 
operator.
--- End diff --

Here is what my branch uses for this logic:

```scala
val maybeTable = provider.getTable(identifier)
val exists = maybeTable.isDefined
(exists, mode) match {
  case (true, SaveMode.ErrorIfExists) =>
throw new AnalysisException(s"Table already exists: 
${identifier.quotedString}")

  case (true, SaveMode.Overwrite) =>
val relation = DataSourceV2Relation.create(
  catalog.name, identifier, maybeTable.get, options)

runCommand(df.sparkSession, "insertInto") {
  OverwritePartitionsDynamic.byName(relation, df.logicalPlan)
}

  case (true, SaveMode.Append) =>
val relation = DataSourceV2Relation.create(
  catalog.name, identifier, maybeTable.get, options)

runCommand(df.sparkSession, "save") {
  AppendData.byName(relation, df.logicalPlan)
}

  case (false, SaveMode.Append) |
   (false, SaveMode.ErrorIfExists) |
   (false, SaveMode.Ignore) |
   (false, SaveMode.Overwrite) =>

runCommand(df.sparkSession, "save") {
  CreateTableAsSelect(catalog, identifier, Seq.empty, 
df.logicalPlan, options,
ignoreIfExists = mode == SaveMode.Ignore)
}

  case _ =>
  // table exists and mode is ignore
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239596456
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 assertNotBucketed("save")
 
-val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-  source match {
-case provider: BatchWriteSupportProvider =>
-  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-source,
-df.sparkSession.sessionState.conf)
-  val options = sessionOptions ++ extraOptions
-
+val session = df.sparkSession
+val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
+if (classOf[TableProvider].isAssignableFrom(cls)) {
+  val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+provider, session.sessionState.conf)
+  val options = sessionOptions ++ extraOptions
+  val dsOptions = new DataSourceOptions(options.asJava)
+  provider.getTable(dsOptions) match {
+case table: SupportsBatchWrite =>
+  val relation = DataSourceV2Relation.create(table, dsOptions)
+  // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
+  // We should create new end-users APIs for the `AppendData` 
operator.
--- End diff --

The example in the referenced comment is this:

```
spark.range(1).format("source").write.save("non-existent-path")
```

If a path for a path-based table doesn't exist, then I think that the table 
doesn't exist. If a table doesn't exist, then the operation for `save` should 
be CTAS instead of AppendData.

Here, I think the right behavior is to check whether the provider returns a 
table. If it doesn't, then the table doesn't exist and the plan should be CTAS. 
If it does, then it must provide the schema used to validate the AppendData 
operation. Since we don't currently have CTAS, this should throw an exception 
stating that the table doesn't exist and can't be created.

More generally, the meaning of SaveMode with v1 is not always reliable. I 
think the right approach is what @cloud-fan suggests: create a new write API 
for v2 tables that is clear for the new logical plans (I've proposed one and 
would be happy to open a PR). Once the logical plans are in place, we can go 
back through this API and move it over to v2 where the behaviors match.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239581374
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

Maybe it should also be part of the `TableProvider` contract that if the 
table can't be located, it throws an exception?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239578059
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

@jose-torres, create on write is done by CTAS. It should not be left up to 
the source whether to fail or create.

I think the confusion here is that this is a degenerate case where Spark 
has no ability to interact with the table's metadata. Spark must assume that it 
exists because the caller is writing to it.

The caller is indicating that a table exists, is identified by some 
configuration, and that a specific implementation can be used to write to it. 
That's what happens today when source implementations are directly specified.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239563967
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

"Exist" is a relative concept, I suppose. I think we need to somehow allow 
for create-on-write functionality, even if many table providers won't want to 
support it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239559037
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

What does it mean to write to a non-existing table? If you're writing 
somewhere, the table must exist.

This is for creating a table directly from configuration and an 
implementation class in the DataFrameWriter API. The target of the write still 
needs to exist.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239469368
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

That's why I left 
https://github.com/apache/spark/pull/23208#discussion_r238524973 .

namings are welcome!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239454594
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

It is quite confusing to have `BatchWriteSupport` and  `SupportsBatchWrite` 
to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r238524973
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
--- End diff --

I don't have a better naming in mind, so I leave it as `WriteSupport` for 
now. Better naming is welcome to match `Scan`!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r238313454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
--- End diff --

I'll do it in my next PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r238313221
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 assertNotBucketed("save")
 
-val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-  source match {
-case provider: BatchWriteSupportProvider =>
-  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-source,
-df.sparkSession.sessionState.conf)
-  val options = sessionOptions ++ extraOptions
-
+val session = df.sparkSession
+val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
+if (classOf[TableProvider].isAssignableFrom(cls)) {
+  val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+provider, session.sessionState.conf)
+  val options = sessionOptions ++ extraOptions
+  val dsOptions = new DataSourceOptions(options.asJava)
+  provider.getTable(dsOptions) match {
+case table: SupportsBatchWrite =>
+  val relation = DataSourceV2Relation.create(table, dsOptions)
+  // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
+  // We should create new end-users APIs for the `AppendData` 
operator.
--- End diff --

according to the discussion in 
https://github.com/apache/spark/pull/22688#issuecomment-428626027 , the 
behavior of append operator and `SaveMode.Append` can be different. We should 
revisit it when we have the new end-user write APIs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-03 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23208

[SPARK-25530][SQL] data source v2 API refactor (batch write)

## What changes were proposed in this pull request?

Adjust the batch write API to match the read API refactor after 
https://github.com/apache/spark/pull/23086

Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, 
and make it extend `Table`. It also cleans up some code as batch API is 
completed.

This PR also removes the test from 
https://github.com/apache/spark/pull/22688 . Now data source must return a 
table for read/write. It's a little awkward to use it with the `SaveMode` based 
write APIs, as users can append data to a non-existing table. `TableProvider` 
needs to return a `Table` instance with empty schema if the table doesn't 
exist, so that we can write it later. Hopefully we can remove the `SaveMode` 
based write APIs after the new APIs are finished and widely used.

A few notes about future changes:
1. We will create `SupportsStreamingWrite` later for streaming APIs
2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. 
for the new end-user write APIs. I think streaming APIs would remain to use 
`OutputMode`, and new end-user write APIs will apply to batch only, at least in 
the near future.


## How was this patch tested?

existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark refactor-batch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23208


commit 00fc34fa793b922a48a4bf8e9f9cd0e3b688800b
Author: Wenchen Fan 
Date:   2018-12-03T14:38:43Z

data source v2 API refactor (batch write)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org