[jira] [Commented] (SPARK-25530) data source v2 API refactor (batch write)
[ https://issues.apache.org/jira/browse/SPARK-25530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717830#comment-16717830 ] ASF GitHub Bot commented on SPARK-25530: rdblue commented on a change in pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (batch write) URL: https://github.com/apache/spark/pull/23208#discussion_r240756984 ## File path: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ## @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. Review comment: I suggest we use the v1 file source as the basis for the behavior for v2. You can see an implementation of that behavior in my other comment. If the table exists, overwrite is a dynamic partition overwrite, append is an append, and ignore does nothing. If the table doesn't exist, then the operation is a CTAS. (Note that we can also check properties to correctly mirror the behavior for static overwrite.) Your concern is addressed by not using the `Append` plan when the file source would have needed to create the table. The critical difference is that this behavior is all implemented in Spark instead of passing `SaveMode` to the source. If you pass `SaveMode` to the source, Spark can't guarantee that it is consistent across sources. We are trying to fix inconsistent behavior in v2. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > data source v2 API refactor (batch write) > - > > Key: SPARK-25530 > URL: https://issues.apache.org/jira/browse/SPARK-25530 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Priority: Major > > Adjust the batch write API to match the read API after refactor -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25530) data source v2 API refactor (batch write)
[ https://issues.apache.org/jira/browse/SPARK-25530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715953#comment-16715953 ] ASF GitHub Bot commented on SPARK-25530: cloud-fan commented on a change in pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (batch write) URL: https://github.com/apache/spark/pull/23208#discussion_r240448233 ## File path: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ## @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. Review comment: what about the file source behavior difference between `SaveMode.Append` and the new append operator? Are you saying we should accept it and ask users to change their code? file source is widely used with `df.write.save` API... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > data source v2 API refactor (batch write) > - > > Key: SPARK-25530 > URL: https://issues.apache.org/jira/browse/SPARK-25530 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Priority: Major > > Adjust the batch write API to match the read API after refactor -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25530) data source v2 API refactor (batch write)
[ https://issues.apache.org/jira/browse/SPARK-25530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715618#comment-16715618 ] ASF GitHub Bot commented on SPARK-25530: rdblue commented on a change in pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (batch write) URL: https://github.com/apache/spark/pull/23208#discussion_r240394365 ## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ## @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") -val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) -if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - source match { -case provider: BatchWriteSupportProvider => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( -source, -df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions - +val session = df.sparkSession +val cls = DataSource.lookupDataSource(source, session.sessionState.conf) +if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( +provider, session.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { +case table: SupportsBatchWrite => + val relation = DataSourceV2Relation.create(table, dsOptions) + // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`. + // We should create new end-users APIs for the `AppendData` operator. Review comment: I see no reason to make this API depend on migrating the file source. We know that `SaveMode` must be removed. It makes no sense to create a broken file source implementation and then remove this afterward. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > data source v2 API refactor (batch write) > - > > Key: SPARK-25530 > URL: https://issues.apache.org/jira/browse/SPARK-25530 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Priority: Major > > Adjust the batch write API to match the read API after refactor -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25530) data source v2 API refactor (batch write)
[ https://issues.apache.org/jira/browse/SPARK-25530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715622#comment-16715622 ] ASF GitHub Bot commented on SPARK-25530: rdblue commented on a change in pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (batch write) URL: https://github.com/apache/spark/pull/23208#discussion_r240394580 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ## @@ -17,52 +17,49 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import scala.collection.JavaConverters._ +import java.util.{Optional, UUID} import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport import org.apache.spark.sql.types.StructType /** - * A logical plan representing a data source v2 scan. + * A logical plan representing a data source v2 table. * - * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param table The table that this relation represents. + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]] + *and [[BatchWriteSupport]]. */ case class DataSourceV2Relation( -// TODO: remove `source` when we finish API refactor for write. -source: TableProvider, -table: SupportsBatchRead, +table: Table, output: Seq[AttributeReference], -options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +// TODO: use a simple case insensitive map instead. +options: DataSourceOptions) extends LeafNode with MultiInstanceRelation with NamedRelation { - import DataSourceV2Relation._ - override def name: String = table.name() override def simpleString: String = { s"RelationV2${truncatedString(output, "[", ", ", "]")} $name" } - def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema) - - def newScanBuilder(): ScanBuilder = { -val dsOptions = new DataSourceOptions(options.asJava) -table.newScanBuilder(dsOptions) + def newWriteSupport(inputSchema: StructType, mode: SaveMode): Optional[BatchWriteSupport] = { Review comment: I would hold off on this discussion for now. I think this is going to require significant changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > data source v2 API refactor (batch write) > - > > Key: SPARK-25530 > URL: https://issues.apache.org/jira/browse/SPARK-25530 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Priority: Major > > Adjust the batch write API to match the read API after refactor -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25530) data source v2 API refactor (batch write)
[ https://issues.apache.org/jira/browse/SPARK-25530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715617#comment-16715617 ] ASF GitHub Bot commented on SPARK-25530: rdblue commented on a change in pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (batch write) URL: https://github.com/apache/spark/pull/23208#discussion_r240394058 ## File path: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ## @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. Review comment: `SaveMode` is incompatible with the SPIP to standarize behavior that was voted on and accepted. The save mode in `DataFrameWriter` must be used to create v2 plans that have well-defined behavior and cannot be passed to implementations in the final version of the v2 read/write API. I see no reason to put off removing `SaveMode` from the API. If we remove it now, we will avoid having more versions of this API that are **fundamentally broken**. We will avoid more implementations that rely on it, not aware that it will be removed. To your point about whether it is safe: the only case where this is actually used is `SaveMode.Overwrite` and `SaveMode.Append`. To replace those, all that needs to happen is to define what kind of overwrite should happen here (dynamic or truncate). I can supply the logical plan and physical implementation in a follow-up PR because I already have all this written and waiting to go in. Or, I can add a PR to merge first if you'd like to have these changes depend on that implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > data source v2 API refactor (batch write) > - > > Key: SPARK-25530 > URL: https://issues.apache.org/jira/browse/SPARK-25530 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Priority: Major > > Adjust the batch write API to match the read API after refactor -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25530) data source v2 API refactor (batch write)
[ https://issues.apache.org/jira/browse/SPARK-25530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707365#comment-16707365 ] Apache Spark commented on SPARK-25530: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/23208 > data source v2 API refactor (batch write) > - > > Key: SPARK-25530 > URL: https://issues.apache.org/jira/browse/SPARK-25530 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Priority: Major > > Adjust the batch write API to match the read API after refactor -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25530) data source v2 API refactor (batch write)
[ https://issues.apache.org/jira/browse/SPARK-25530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707361#comment-16707361 ] Apache Spark commented on SPARK-25530: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/23208 > data source v2 API refactor (batch write) > - > > Key: SPARK-25530 > URL: https://issues.apache.org/jira/browse/SPARK-25530 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Priority: Major > > Adjust the batch write API to match the read API after refactor -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org