[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-10-15 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r335262702
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java
 ##
 @@ -36,26 +35,12 @@
 public interface TableProvider {
 
   /**
-   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   * Return a {@link Table} instance with the given table options to do 
read/write.
+   * Implementations should infer the table schema and partitioning.
*
* @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
*topic name, etc. It's an immutable case-insensitive 
string-to-string map.
*/
+  // TODO: this should take a Map as table properties.
 
 Review comment:
   > I don't think that partition inference needs to scan the entire file 
system tree.
   
   Spark needs to do it to get all the partition values and infer the schema. 
This is an existing feature that Spark can infer a common type for partition 
values with different types. The same applies to schema inference as well. 
Spark can read parquet files of different but compatible schema, so Spark must 
read all the files to infer the schema.
   
   Can you share more about the static cache? Do you mean a global cache that 
maps a directory to its listed files?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-10-15 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r334951921
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
 ##
 @@ -35,9 +38,12 @@ class CSVDataSourceV2 extends FileDataSourceV2 {
 CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
-  override def getTable(options: CaseInsensitiveStringMap, schema: 
StructType): Table = {
-val paths = getPaths(options)
+  override def getTable(
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): Table = {
+val paths = getPaths(properties)
 val tableName = getTableName(paths)
-CSVTable(tableName, sparkSession, options, paths, Some(schema), 
fallbackFileFormat)
+CSVTable(tableName, sparkSession, properties, paths, Some(schema), 
fallbackFileFormat)
 
 Review comment:
   We shouldn't make this PR bigger and supports partitioning in file source. 
For now let's explicitly say that file source doesn't support user-specified 
partitioning: 
https://github.com/apache/spark/pull/25651/files#diff-3d5fde6e98a2856ba1b00ce6f172c4a8R72


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r332613323
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
 ##
 @@ -51,38 +52,36 @@ private[sql] case class V1Table(v1Table: CatalogTable) 
extends Table {
 }
   }
 
-  def catalogTable: CatalogTable = v1Table
-
   lazy val options: Map[String, String] = {
-v1Table.storage.locationUri match {
+catalogTable.storage.locationUri match {
   case Some(uri) =>
-v1Table.storage.properties + ("path" -> uri.toString)
+catalogTable.storage.properties + ("path" -> uri.toString)
   case _ =>
-v1Table.storage.properties
+catalogTable.storage.properties
 }
   }
 
-  override lazy val properties: util.Map[String, String] = 
v1Table.properties.asJava
+  override lazy val properties: util.Map[String, String] = 
catalogTable.properties.asJava
 
 Review comment:
   I leave the options/properties unchanged here, but we need to figure it out 
later. Currently there are 2 directions:
   1. We have table options and table properties. Table options are special 
table properties with "option." prefix in its name. Table options will be 
extracted and used as scan/write options.
   2. We only have table properties. The OPTIONS clause in CREATE TABLE should 
be the same as the TBLPROPERTIES clause.
   
   We can have more discussion about it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-10-07 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r332095472
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java
 ##
 @@ -36,26 +35,12 @@
 public interface TableProvider {
 
   /**
-   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   * Return a {@link Table} instance with the given table options to do 
read/write.
+   * Implementations should infer the table schema and partitioning.
*
* @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
*topic name, etc. It's an immutable case-insensitive 
string-to-string map.
*/
+  // TODO: this should take a Map as table properties.
 
 Review comment:
   It turns out that this is more complicated than I thought.
   
   For file source, we need to list the files to scan. If we need to infer the 
schema/partitioning, we will list the files a little earlier, and reuse the 
listed files when scan.
   
   That said, we need to either make `TableProvider` stateful, so that file 
source can keep the listed files after schema inference. Or we need to 
re-design the API so that it can carry states after schema inference.
   
   Maybe we should move on with 2 `getTable` methods as it is in this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-10-07 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r331886955
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java
 ##
 @@ -36,26 +35,12 @@
 public interface TableProvider {
 
   /**
-   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   * Return a {@link Table} instance with the given table options to do 
read/write.
+   * Implementations should infer the table schema and partitioning.
*
* @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
*topic name, etc. It's an immutable case-insensitive 
string-to-string map.
*/
+  // TODO: this should take a Map as table properties.
 
 Review comment:
   Then API is limited to the assumption that schema inference depends on 
partition inference. This is true for file source, but I'm not sure if it's 
true to all data sources.
   
   Shall we stick to the current proposal that has 2 `getTable` methods?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-10-07 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r331886955
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java
 ##
 @@ -36,26 +35,12 @@
 public interface TableProvider {
 
   /**
-   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   * Return a {@link Table} instance with the given table options to do 
read/write.
+   * Implementations should infer the table schema and partitioning.
*
* @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
*topic name, etc. It's an immutable case-insensitive 
string-to-string map.
*/
+  // TODO: this should take a Map as table properties.
 
 Review comment:
   Then API is limited to the assumption that schema inference depends on 
partition inference. This is true for file source, but I'm not sure if it's 
true to all data sources.
   
   Shall we stick to the current proposal that has 2 `getTable` methods?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-10-03 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r331014385
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java
 ##
 @@ -36,26 +35,12 @@
 public interface TableProvider {
 
   /**
-   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   * Return a {@link Table} instance with the given table options to do 
read/write.
+   * Implementations should infer the table schema and partitioning.
*
* @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
*topic name, etc. It's an immutable case-insensitive 
string-to-string map.
*/
+  // TODO: this should take a Map as table properties.
 
 Review comment:
   I did a quick try, and found one problem: it's very likely that a data 
source needs to infer schema and partition together, e.g. file source. If we 
have 2 separated method `inferSchema` and `inferPartitioning`, it may force the 
data source to do expensive inference twice.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-27 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r329298155
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogExtensionForTableProvider.scala
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, 
Identifier, SupportsSpecifiedSchemaPartitioning, Table}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class CatalogExtensionForTableProvider extends DelegatingCatalogExtension {
+
+  private val conf = SQLConf.get
+
+  override def loadTable(ident: Identifier): Table = {
+val table = super.loadTable(ident)
+tryResolveTableProvider(table)
+  }
+
+  override def createTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): Table = {
+val provider = properties.getOrDefault("provider", 
conf.defaultDataSourceName)
+val maybeProvider = DataSource.lookupDataSourceV2(provider, conf)
+val (actualSchema, actualPartitioning) = if (maybeProvider.isDefined && 
schema.isEmpty) {
+  // A sanity check. The parser should guarantee it.
+  assert(partitions.isEmpty)
+  // If `CREATE TABLE ... USING` does not specify table metadata, get the 
table metadata from
+  // data source first.
+  val table = maybeProvider.get.getTable(new 
CaseInsensitiveStringMap(properties))
+  table.schema() -> table.partitioning()
+} else {
+  schema -> partitions
+}
+super.createTable(ident, actualSchema, actualPartitioning, properties)
+// call `loadTable` to make sure the schema/partitioning specified in 
`CREATE TABLE ... USING`
+// matches the actual data schema/partitioning. If error happens during 
table loading, drop
+// the table.
+try {
+  loadTable(ident)
+} catch {
+  case NonFatal(e) =>
+dropTable(ident)
+throw e
+}
+  }
+
+  private def tryResolveTableProvider(table: Table): Table = {
+val providerName = table.properties().get("provider")
+assert(providerName != null)
+DataSource.lookupDataSourceV2(providerName, conf).map {
+  // TODO: support file source v2 in CREATE TABLE USING.
+  case _: FileDataSourceV2 => table
 
 Review comment:
   yup


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-26 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r328626270
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogExtensionForTableProvider.scala
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, 
Identifier, SupportsSpecifiedSchemaPartitioning, Table}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class CatalogExtensionForTableProvider extends DelegatingCatalogExtension {
+
+  private val conf = SQLConf.get
+
+  override def loadTable(ident: Identifier): Table = {
+val table = super.loadTable(ident)
+tryResolveTableProvider(table)
+  }
+
+  override def createTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): Table = {
+val provider = properties.getOrDefault("provider", 
conf.defaultDataSourceName)
+val maybeProvider = DataSource.lookupDataSourceV2(provider, conf)
+val (actualSchema, actualPartitioning) = if (maybeProvider.isDefined && 
schema.isEmpty) {
+  // A sanity check. The parser should guarantee it.
+  assert(partitions.isEmpty)
+  // If `CREATE TABLE ... USING` does not specify table metadata, get the 
table metadata from
+  // data source first.
+  val table = maybeProvider.get.getTable(new 
CaseInsensitiveStringMap(properties))
+  table.schema() -> table.partitioning()
 
 Review comment:
   There are 2 "table" in this context:
   1. the table entry in Spark metastore
   2. the table returned by `TableProvider`
   
   For example, `CREATE TABLE abc USING jdbc OPTIONS(table='xyz')`, `abc` is 
the table entry in Spark metastore, `xyz` is the table returned by 
`TableProvider`. The table entry in Spark metastore is simply a link to the 
table returned by `TableProvider`. So the CREATE TABLE here is not to create a 
table in JDBC, but to create a table in Spark metastore that links to the JDBC 
table.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-26 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r328624180
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogExtensionForTableProvider.scala
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, 
Identifier, SupportsSpecifiedSchemaPartitioning, Table}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class CatalogExtensionForTableProvider extends DelegatingCatalogExtension {
+
+  private val conf = SQLConf.get
+
+  override def loadTable(ident: Identifier): Table = {
 
 Review comment:
   I did struggle for it many times. If we put this logic in 
`V2SessionCatalog`, then the catalog extension API becomes less useful. If a 
user creates a `CustomExtension` and tries to overwrite the `loadTable` method, 
then there is not much he can do as the table provider resolving is already 
done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-26 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r328621570
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
 ##
 @@ -31,14 +31,15 @@ import org.apache.spark.sql.types.StructType
  * An implementation of catalog v2 `Table` to expose v1 table metadata.
  */
 private[sql] case class V1Table(v1Table: CatalogTable) extends Table {
+  assert(v1Table.provider.isDefined)
 
 Review comment:
   Only view has no provider. So this is guaranteed. Please see the parameter 
doc of `CatalogTable#provider`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-25 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r328442663
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogExtensionForTableProvider.scala
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, 
Identifier, SupportsSpecifiedSchemaPartitioning, Table}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class CatalogExtensionForTableProvider extends DelegatingCatalogExtension {
+
+  private val conf = SQLConf.get
+
+  override def loadTable(ident: Identifier): Table = {
+val table = super.loadTable(ident)
+tryResolveTableProvider(table)
+  }
+
+  override def createTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): Table = {
+val provider = properties.getOrDefault("provider", 
conf.defaultDataSourceName)
+val maybeProvider = DataSource.lookupDataSourceV2(provider, conf)
+val (actualSchema, actualPartitioning) = if (maybeProvider.isDefined && 
schema.isEmpty) {
+  // A sanity check. The parser should guarantee it.
+  assert(partitions.isEmpty)
+  // If `CREATE TABLE ... USING` does not specify table metadata, get the 
table metadata from
+  // data source first.
+  val table = maybeProvider.get.getTable(new 
CaseInsensitiveStringMap(properties))
+  table.schema() -> table.partitioning()
+} else {
+  schema -> partitions
+}
+super.createTable(ident, actualSchema, actualPartitioning, properties)
+// call `loadTable` to make sure the schema/partitioning specified in 
`CREATE TABLE ... USING`
+// matches the actual data schema/partitioning. If error happens during 
table loading, drop
+// the table.
+try {
+  loadTable(ident)
+} catch {
+  case NonFatal(e) =>
+dropTable(ident)
+throw e
+}
+  }
+
+  private def tryResolveTableProvider(table: Table): Table = {
+val providerName = table.properties().get("provider")
+assert(providerName != null)
+DataSource.lookupDataSourceV2(providerName, conf).map {
+  // TODO: support file source v2 in CREATE TABLE USING.
+  case _: FileDataSourceV2 => table
 
 Review comment:
   As you already found out in 
https://github.com/apache/spark/pull/25651#discussion_r328336988
   
   File source v2 can't take the partitioning from metastore because the 
`TableProvider` API was incompleted before. I don't want to fix file source v2 
in this PR, so I simply ignore it here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-25 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r328441721
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CatalogExtensionForTableProvider.scala
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, 
Identifier, SupportsSpecifiedSchemaPartitioning, Table}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class CatalogExtensionForTableProvider extends DelegatingCatalogExtension {
+
+  private val conf = SQLConf.get
+
+  override def loadTable(ident: Identifier): Table = {
+val table = super.loadTable(ident)
+tryResolveTableProvider(table)
+  }
+
+  override def createTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): Table = {
+val provider = properties.getOrDefault("provider", 
conf.defaultDataSourceName)
+val maybeProvider = DataSource.lookupDataSourceV2(provider, conf)
+val (actualSchema, actualPartitioning) = if (maybeProvider.isDefined && 
schema.isEmpty) {
+  // A sanity check. The parser should guarantee it.
+  assert(partitions.isEmpty)
+  // If `CREATE TABLE ... USING` does not specify table metadata, get the 
table metadata from
+  // data source first.
+  val table = maybeProvider.get.getTable(new 
CaseInsensitiveStringMap(properties))
+  table.schema() -> table.partitioning()
+} else {
+  schema -> partitions
+}
+super.createTable(ident, actualSchema, actualPartitioning, properties)
+// call `loadTable` to make sure the schema/partitioning specified in 
`CREATE TABLE ... USING`
+// matches the actual data schema/partitioning. If error happens during 
table loading, drop
+// the table.
 
 Review comment:
   e.g. `CREATE TABLE t(i int) USING jdbc OPTIONS (table=t2)`. It's possible 
that the JDBC table `t2` has a different schema.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-25 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r328441296
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 ##
 @@ -57,4 +60,21 @@ private[sql] object DataSourceV2Utils extends Logging {
   case _ => Map.empty
 }
   }
+
+  def loadTableWithUserSpecifiedSchema(
+  provider: TableProvider,
+  schema: StructType,
+  options: CaseInsensitiveStringMap): Table = {
+provider match {
+  case s: SupportsSpecifiedSchemaPartitioning =>
+// TODO: `DataFrameReader`/`DataStreamReader` should have an API to 
set user-specified
+//   partitioning.
 
 Review comment:
   But there is no API to specify the partitioning here, neither 
`DataFrameReader` or `DataStreamReader`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-25 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r328440980
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java
 ##
 @@ -36,26 +35,12 @@
 public interface TableProvider {
 
   /**
-   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   * Return a {@link Table} instance with the given table options to do 
read/write.
+   * Implementations should infer the table schema and partitioning.
*
* @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
*topic name, etc. It's an immutable case-insensitive 
string-to-string map.
*/
+  // TODO: this should take a Map as table properties.
 
 Review comment:
   SGTM. Shall we do it in this PR? We need to update all the `TableProvider` 
implementations, i.e. file source v2, streaming source v2, testing v2 sources.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-25 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r328440703
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
 ##
 @@ -35,9 +38,12 @@ class CSVDataSourceV2 extends FileDataSourceV2 {
 CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
-  override def getTable(options: CaseInsensitiveStringMap, schema: 
StructType): Table = {
-val paths = getPaths(options)
+  override def getTable(
+  schema: StructType,
+  partitions: Array[Transform],
 
 Review comment:
   shall we also update `TableCatalog.createTable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r325731939
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 ##
 @@ -66,7 +105,11 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: 
SQLConf)
 throw new NoSuchTableException(ident)
 }
 
-V1Table(catalogTable)
+if (catalogTable.tableType == CatalogTableType.VIEW) {
+  throw new NoSuchTableException(ident)
+}
+
+tryResolveTableProvider(V1Table(catalogTable))
 
 Review comment:
   This is the core change of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r325731939
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 ##
 @@ -66,7 +105,11 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: 
SQLConf)
 throw new NoSuchTableException(ident)
 }
 
-V1Table(catalogTable)
+if (catalogTable.tableType == CatalogTableType.VIEW) {
+  throw new NoSuchTableException(ident)
+}
+
+tryResolveTableProvider(V1Table(catalogTable))
 
 Review comment:
   This is the major change of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r325731051
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java
 ##
 @@ -36,26 +40,21 @@
 public interface TableProvider {
 
   /**
-   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   * Return a {@link Table} instance to do read/write with the given table 
metadata. The returned
+   * table must report the same schema and partitioning with the given table 
metadata.
*
-   * @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
-   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
-   */
-  Table getTable(CaseInsensitiveStringMap options);
-
-  /**
-   * Return a {@link Table} instance to do read/write with user-specified 
schema and options.
-   * 
-   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
-   * override this method to handle user-specified schema.
-   * 
-   * @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
-   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
-   * @param schema the user-specified schema.
-   * @throws UnsupportedOperationException
+   * @param schema The schema of the table to load. If it's empty, 
implementations should infer it.
+   * @param partitions The data partitioning of the table to load. If it's 
empty, implementations
+   *   should infer it.
+   * @param properties The properties of the table to load. It should be 
sufficient to define and
+   *   access a table. The properties map may be {@link 
CaseInsensitiveStringMap}.
+   *
+   * @throws IllegalArgumentException if the implementation can't infer 
schema/partitioning, or
+   *  the given schema/partitioning doesn't 
match the actual data
+   *  schema/partitioning.
*/
-  default Table getTable(CaseInsensitiveStringMap options, StructType schema) {
-throw new UnsupportedOperationException(
-  this.getClass().getSimpleName() + " source does not support 
user-specified schema");
-  }
+  Table getTable(
+  Optional schema,
+  Optional partitions,
+  Map properties);
 
 Review comment:
   I'd like to discuss how the API should look like. The current use cases 
include
   1. users only specify options, implementation needs to infer 
schema/partitioning
   2. users specify options and schema, implementation needs to infer 
partitioning
   3. users specify all the things.
   
   Shall we create 3 methods or just create one single method like this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-12 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r323808715
 
 

 ##
 File path: 
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
 ##
 @@ -35,7 +36,10 @@ class AvroDataSourceV2 extends FileDataSourceV2 {
 AvroTable(tableName, sparkSession, options, paths, None, 
fallbackFileFormat)
   }
 
-  override def getTable(options: CaseInsensitiveStringMap, schema: 
StructType): Table = {
+  override def getTable(
+  options: CaseInsensitiveStringMap,
+  schema: StructType,
+  partitions: Array[Transform]): Table = {
 
 Review comment:
   Or we can make table properties case insensitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] Support passing all Table metadata in TableProvider

2019-09-12 Thread GitBox
cloud-fan commented on a change in pull request #25651: [SPARK-28948][SQL] 
Support passing all Table metadata in TableProvider
URL: https://github.com/apache/spark/pull/25651#discussion_r323808342
 
 

 ##
 File path: 
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
 ##
 @@ -35,7 +36,10 @@ class AvroDataSourceV2 extends FileDataSourceV2 {
 AvroTable(tableName, sparkSession, options, paths, None, 
fallbackFileFormat)
   }
 
-  override def getTable(options: CaseInsensitiveStringMap, schema: 
StructType): Table = {
+  override def getTable(
+  options: CaseInsensitiveStringMap,
+  schema: StructType,
+  partitions: Array[Transform]): Table = {
 
 Review comment:
   But we do have a problem here. Table properties are case sensitive while 
scan options are case insensitive.
   
   Think about 2 cases:
   1. `spark.read.format("myFormat").options(...).schema(...).load()`.
   We need to get the table with the user-specifed options and schema. When 
scan the table, we need to use the user-specified options as scan options. The 
problem is, `DataFrameReader.options` specifies both table properties and scan 
options in this case.
   2. `CREATE TABLE t USING myFormat TABLEPROP ...` and then 
`spark.read.options(...).table("t")`
   In this case, `DataFrameReader.options` only specifies scan options.
   
   Ideally, `TableProvider.getTable` takes table properties which should be 
case sensitive. However, `DataFrameReader.options` also specifies scan options 
which should be case insensitive.
   
   I don't have a good idea now. Maybe it's OK to treat this as a special table 
which accepts case insensitive table properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org