[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2020-01-16 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r367291651
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
 ##
 @@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession, 
SQLContext}
+import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, 
Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns, V1Scan}
+import org.apache.spark.sql.execution.RowDataSourceScanExec
+import org.apache.spark.sql.sources.{BaseRelation, Filter, GreaterThan, 
TableScan}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+abstract class V1ReadFallbackSuite extends QueryTest with SharedSparkSession {
+  protected def baseTableScan(): DataFrame
+
+  test("full scan") {
+val df = baseTableScan()
+val v1Scan = df.queryExecution.executedPlan.collect {
+  case s: RowDataSourceScanExec => s
+}
+assert(v1Scan.length == 1)
+checkAnswer(df, Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
+  }
+
+  test("column pruning") {
+val df = baseTableScan().select("i")
+val v1Scan = df.queryExecution.executedPlan.collect {
+  case s: RowDataSourceScanExec => s
+}
+assert(v1Scan.length == 1)
+assert(v1Scan.head.output.map(_.name) == Seq("i"))
+checkAnswer(df, Seq(Row(1), Row(2), Row(3)))
+  }
+
+  test("filter push down") {
+val df = baseTableScan().filter("i > 1 and j < 30")
+val v1Scan = df.queryExecution.executedPlan.collect {
+  case s: RowDataSourceScanExec => s
+}
+assert(v1Scan.length == 1)
+// `j < 30` can't be pushed.
+assert(v1Scan.head.handledFilters.size == 1)
+checkAnswer(df, Seq(Row(2, 20)))
+  }
+
+  test("filter push down + column pruning") {
+val df = baseTableScan().filter("i > 1").select("i")
+val v1Scan = df.queryExecution.executedPlan.collect {
+  case s: RowDataSourceScanExec => s
+}
+assert(v1Scan.length == 1)
+assert(v1Scan.head.output.map(_.name) == Seq("i"))
+assert(v1Scan.head.handledFilters.size == 1)
+checkAnswer(df, Seq(Row(2), Row(3)))
+  }
+}
+
+class V1ReadFallbackWithDataFrameReaderSuite extends V1ReadFallbackSuite {
+  override protected def baseTableScan(): DataFrame = {
+spark.read.format(classOf[V1ReadFallbackTableProvider].getName).load()
+  }
+}
+
+class V1ReadFallbackWithCatalogSuite extends V1ReadFallbackSuite {
+  override def beforeAll(): Unit = {
+super.beforeAll()
+spark.conf.set("spark.sql.catalog.read_fallback", 
classOf[V1ReadFallbackCatalog].getName)
+sql("CREATE TABLE read_fallback.tbl(i int, j int) USING foo")
+  }
+
+  override def afterAll(): Unit = {
+spark.conf.unset("spark.sql.catalog.read_fallback")
+super.afterAll()
+  }
+
+  override protected def baseTableScan(): DataFrame = {
+spark.table("read_fallback.tbl")
+  }
+}
+
+class V1ReadFallbackCatalog extends BasicInMemoryTableCatalog {
+  override def createTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): Table = {
+// To simplify the test implementation, only support fixed schema.
+if (schema != V1ReadFallbackCatalog.schema || partitions.nonEmpty) {
+  throw new UnsupportedOperationException
+}
+val table = new TableWithV1ReadFallback(ident.toString)
+tables.put(ident, table)
+table
+  }
+}
+
+object V1ReadFallbackCatalog {
+  val schema = new StructType().add("i", "int").add("j", "int")
+}
+
+class V1ReadFallbackTableProvider extends TableProvider {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+new TableWithV1Rea

[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2020-01-16 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r367283434
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ##
 @@ -19,40 +19,72 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, Strategy}
+import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, 
ResolvedTable}
-import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, 
NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetLocation, 
AlterNamespaceSetOwner, AlterNamespaceSetProperties, AlterTable, AppendData, 
CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, 
CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, 
DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, 
ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, 
ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
-import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, 
TableChange}
+import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, 
SupportsNamespaces, TableCapability, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.read.V1Scan
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, 
RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-object DataSourceV2Strategy extends Strategy with PredicateHelper {
+class DataSourceV2Strategy(session: SparkSession) extends Strategy with 
PredicateHelper {
 
   import DataSourceV2Implicits._
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
+  private def withProjectAndFilter(
+  project: Seq[NamedExpression],
+  filters: Seq[Expression],
+  scan: LeafExecNode,
+  needsUnsafeConversion: Boolean): SparkPlan = {
+val filterCondition = filters.reduceLeftOption(And)
+val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+if (withFilter.output != project || needsUnsafeConversion) {
+  ProjectExec(project, withFilter)
+} else {
+  withFilter
+}
+  }
+
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(project, filters,
+relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) =>
+  val pushedFilters = 
relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG)
+.getOrElse(Seq.empty)
+  val v1Relation = v1Scan.toV1TableScan[BaseRelation with 
TableScan](session.sqlContext)
+  if (v1Relation.schema != v1Scan.readSchema()) {
+throw new IllegalArgumentException(
+  "The fallback v1 relation reports inconsistent schema:\n" +
+"Schema of v2 scan: " + v1Scan.readSchema() + "\n" +
+"Schema of v1 relation: " + v1Relation.schema)
+  }
+  val rdd = v1Relation.buildScan()
+  val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, 
rdd)
+  val originalOutputNames = relation.table.schema().map(_.name)
+  val requiredColumnsIndex = 
output.map(_.name).map(originalOutputNames.indexOf)
+  val dsScan = RowDataSourceScanExec(
+output,
+requiredColumnsIndex,
+pushedFilters.toSet,
 
 Review comment:
   After some more thoughts, I think we should only track the pushed filters.
   
   In data source v1, what we track is all the filters Spark pushes down, and 
the ones accepted by the source, which is calculated by `all filters -- 
post-scan filters`. This is actually imprecise. For example, parquet source can 
leverage the data filters but still need Spark to evaluate the filter again, so 
they are post-scan filters and won't be treated as "accepted filters".
   
   In data source v2, we fix it by asking the source to report the pushed 
filters (via `SupportsPushDownFiters.pushedFilters`), and we only report the 
pushed filters in plan st

[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2020-01-16 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r367283434
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ##
 @@ -19,40 +19,72 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, Strategy}
+import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, 
ResolvedTable}
-import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, 
NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetLocation, 
AlterNamespaceSetOwner, AlterNamespaceSetProperties, AlterTable, AppendData, 
CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, 
CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, 
DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, 
ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, 
ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
-import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, 
TableChange}
+import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, 
SupportsNamespaces, TableCapability, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.read.V1Scan
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, 
RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-object DataSourceV2Strategy extends Strategy with PredicateHelper {
+class DataSourceV2Strategy(session: SparkSession) extends Strategy with 
PredicateHelper {
 
   import DataSourceV2Implicits._
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
+  private def withProjectAndFilter(
+  project: Seq[NamedExpression],
+  filters: Seq[Expression],
+  scan: LeafExecNode,
+  needsUnsafeConversion: Boolean): SparkPlan = {
+val filterCondition = filters.reduceLeftOption(And)
+val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+if (withFilter.output != project || needsUnsafeConversion) {
+  ProjectExec(project, withFilter)
+} else {
+  withFilter
+}
+  }
+
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(project, filters,
+relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) =>
+  val pushedFilters = 
relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG)
+.getOrElse(Seq.empty)
+  val v1Relation = v1Scan.toV1TableScan[BaseRelation with 
TableScan](session.sqlContext)
+  if (v1Relation.schema != v1Scan.readSchema()) {
+throw new IllegalArgumentException(
+  "The fallback v1 relation reports inconsistent schema:\n" +
+"Schema of v2 scan: " + v1Scan.readSchema() + "\n" +
+"Schema of v1 relation: " + v1Relation.schema)
+  }
+  val rdd = v1Relation.buildScan()
+  val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, 
rdd)
+  val originalOutputNames = relation.table.schema().map(_.name)
+  val requiredColumnsIndex = 
output.map(_.name).map(originalOutputNames.indexOf)
+  val dsScan = RowDataSourceScanExec(
+output,
+requiredColumnsIndex,
+pushedFilters.toSet,
 
 Review comment:
   After some more thoughts, I think we should only track the pushed filters.
   
   In data source v1, what we track is all the filters Spark pushes down, and 
the ones accepted by the source, which is calculated by `all filters -- 
post-scan filters`. This is actually imprecise. For example, parquet source can 
leverage the data filters but still need Spark to evaluate the filter again, so 
they are post-scan filters and won't be treated as "accepted filters".
   
   In data source v2, we fix it by asking the source to report the pushed 
filters (via `SupportsPushDownFiters.pushedFilters`), and we only report the 
pushed filters in plan st

[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2020-01-15 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r367230956
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ##
 @@ -19,40 +19,72 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, Strategy}
+import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, 
ResolvedTable}
-import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, 
NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetLocation, 
AlterNamespaceSetOwner, AlterNamespaceSetProperties, AlterTable, AppendData, 
CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, 
CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, 
DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, 
ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, 
ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
-import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, 
TableChange}
+import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, 
SupportsNamespaces, TableCapability, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.read.V1Scan
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, 
RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-object DataSourceV2Strategy extends Strategy with PredicateHelper {
+class DataSourceV2Strategy(session: SparkSession) extends Strategy with 
PredicateHelper {
 
   import DataSourceV2Implicits._
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
+  private def withProjectAndFilter(
+  project: Seq[NamedExpression],
+  filters: Seq[Expression],
+  scan: LeafExecNode,
+  needsUnsafeConversion: Boolean): SparkPlan = {
+val filterCondition = filters.reduceLeftOption(And)
+val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+if (withFilter.output != project || needsUnsafeConversion) {
+  ProjectExec(project, withFilter)
+} else {
+  withFilter
+}
+  }
+
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(project, filters,
+relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) =>
+  val pushedFilters = 
relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG)
+.getOrElse(Seq.empty)
+  val v1Relation = v1Scan.toV1TableScan[BaseRelation with 
TableScan](session.sqlContext)
+  if (v1Relation.schema != v1Scan.readSchema()) {
+throw new IllegalArgumentException(
+  "The fallback v1 relation reports inconsistent schema:\n" +
+"Schema of v2 scan: " + v1Scan.readSchema() + "\n" +
+"Schema of v1 relation: " + v1Relation.schema)
+  }
+  val rdd = v1Relation.buildScan()
+  val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, 
rdd)
+  val originalOutputNames = relation.table.schema().map(_.name)
+  val requiredColumnsIndex = 
output.map(_.name).map(originalOutputNames.indexOf)
+  val dsScan = RowDataSourceScanExec(
+output,
+requiredColumnsIndex,
+pushedFilters.toSet,
 
 Review comment:
   Yes it should be the entire set of filters, but it's not a big deal. 
`RowDataSourceScanExec.filters` is only used in `toString`, to let people know 
which filters are pushed but not accepted by the source.
   
   Anyway we can retain the full filter set like the pushed filters, I'll fix 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Servic

[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2020-01-12 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r365645744
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ##
 @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, Strategy}
-import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, 
NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import 
org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, 
AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, 
DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, 
LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, 
RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, 
SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, 
ShowTableProperties, ShowTables}
 import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, 
TableCapability}
+import org.apache.spark.sql.connector.read.V1Scan
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, 
RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources.TableScan
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-object DataSourceV2Strategy extends Strategy with PredicateHelper {
+class DataSourceV2Strategy(session: SparkSession) extends Strategy with 
PredicateHelper {
 
   import DataSourceV2Implicits._
 
+  private def withProjectAndFilter(
+  project: Seq[NamedExpression],
+  filters: Seq[Expression],
+  scan: LeafExecNode,
+  needsUnsafeConversion: Boolean): SparkPlan = {
+val filterCondition = filters.reduceLeftOption(And)
+val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+if (withFilter.output != project || needsUnsafeConversion) {
+  ProjectExec(project, withFilter)
+} else {
+  withFilter
+}
+  }
+
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(project, filters,
+relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) =>
+  val pushedFilters = 
relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG)
+.getOrElse(Seq.empty)
+  val v1Relation = v1Scan.toV1TableScan(session.sqlContext)
+  if (v1Relation.schema != v1Scan.readSchema()) {
+throw new IllegalArgumentException(
+  "The fallback v1 relation reports inconsistent schema:\n" +
+"Schema of v2 scan: " + v1Scan.readSchema() + "\n" +
+"Schema of v1 relation: " + v1Relation.schema)
+  }
+  val rdd = v1Relation match {
+case s: TableScan => s.buildScan()
+case _ =>
+  throw new IllegalArgumentException(
+"`V1Scan.toV1Relation` must return a `TableScan` instance.")
+  }
+  val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, 
rdd)
+  val originalOutputNames = relation.table.schema().map(_.name)
+  val requiredColumnsIndex = 
output.map(_.name).map(originalOutputNames.indexOf)
+  val dsScan = RowDataSourceScanExec(
 
 Review comment:
   The Scala auxiliary constructors must call other constructors as the FIRST 
action, e.g. 
   ```
   def this(...) {
 do something
 this(...)
   }
   ```
   is not allowed.
   
   We can create an object and add an `apply` method to encapsulate the 4 lines 
of code, but does it really matter? The 4 lines of code only appear once here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2020-01-11 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r365563231
 
 

 ##
 File path: 
sql/core/src/main/java/org/apache/spark/sql/connector/read/V1Scan.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Unstable;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.sources.BaseRelation;
+import org.apache.spark.sql.sources.TableScan;
+
+/**
+ * A trait that should be implemented by V1 DataSources that would like to 
leverage the DataSource
+ * V2 read code paths.
+ *
+ * This interface is designed to provide Spark DataSources time to migrate to 
DataSource V2 and
+ * will be removed in a future Spark release.
+ *
+ * @since 3.0.0
+ */
+@Unstable
+public interface V1Scan extends Scan {
+
+  /**
+   * Creates an `BaseRelation` with `TableScan` that can scan data from 
DataSource v1 to RDD[Row].
+   *
+   * @since 3.0.0
+   */
+   T toV1TableScan(SQLContext context);
 
 Review comment:
   We haven't marked `SQLContext` as deprecated yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2020-01-11 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r365563206
 
 

 ##
 File path: 
sql/core/src/main/java/org/apache/spark/sql/connector/read/V1Scan.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Unstable;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.sources.BaseRelation;
+import org.apache.spark.sql.sources.TableScan;
+
+/**
+ * A trait that should be implemented by V1 DataSources that would like to 
leverage the DataSource
+ * V2 read code paths.
+ *
+ * This interface is designed to provide Spark DataSources time to migrate to 
DataSource V2 and
+ * will be removed in a future Spark release.
+ *
+ * @since 3.0.0
+ */
+@Unstable
+public interface V1Scan extends Scan {
 
 Review comment:
   The idea is the same with v1 write fallback API. The v1 write fallback API 
also relies on the v2 API to config the write. It's better to leverage the v2 
infra as much as we can. e.g. we may improve the v2 pushdown to push more 
operators that v1 doesn't support.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-12-24 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r361102981
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ##
 @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, Strategy}
-import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, 
NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import 
org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, 
AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, 
DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, 
LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, 
RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, 
SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, 
ShowTableProperties, ShowTables}
 import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, 
TableCapability}
+import org.apache.spark.sql.connector.read.V1Scan
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, 
RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources.TableScan
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-object DataSourceV2Strategy extends Strategy with PredicateHelper {
+class DataSourceV2Strategy(session: SparkSession) extends Strategy with 
PredicateHelper {
 
   import DataSourceV2Implicits._
 
+  private def withProjectAndFilter(
+  project: Seq[NamedExpression],
+  filters: Seq[Expression],
+  scan: LeafExecNode,
+  needsUnsafeConversion: Boolean): SparkPlan = {
+val filterCondition = filters.reduceLeftOption(And)
+val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+if (withFilter.output != project || needsUnsafeConversion) {
+  ProjectExec(project, withFilter)
+} else {
+  withFilter
+}
+  }
+
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(project, filters,
+relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) =>
+  val pushedFilters = 
relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG)
+.getOrElse(Seq.empty)
+  val v1Relation = v1Scan.toV1TableScan(session.sqlContext)
+  if (v1Relation.schema != v1Scan.readSchema()) {
+throw new IllegalArgumentException(
+  "The fallback v1 relation reports inconsistent schema:\n" +
+"Schema of v2 scan: " + v1Scan.readSchema() + "\n" +
+"Schema of v1 relation: " + v1Relation.schema)
+  }
+  val rdd = v1Relation match {
+case s: TableScan => s.buildScan()
+case _ =>
+  throw new IllegalArgumentException(
+"`V1Scan.toV1Relation` must return a `TableScan` instance.")
+  }
+  val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, 
rdd)
+  val originalOutputNames = relation.table.schema().map(_.name)
+  val requiredColumnsIndex = 
output.map(_.name).map(originalOutputNames.indexOf)
+  val dsScan = RowDataSourceScanExec(
 
 Review comment:
   I tried and it's not easy to refactor. The query plans in Spark are all case 
class and you can't create a sub-class of case class. And it's only a few lines 
of code that can be encapsulated, the part of getting the v1 scan should belong 
to the strategy.
   
   If you have some ideas, please help and send a PR to my branch. I'm happy to 
see approaches that can make the code cleaner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-12-24 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r361102981
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ##
 @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, Strategy}
-import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, 
NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import 
org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, 
AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, 
DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, 
LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, 
RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, 
SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, 
ShowTableProperties, ShowTables}
 import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, 
TableCapability}
+import org.apache.spark.sql.connector.read.V1Scan
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, 
RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources.TableScan
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-object DataSourceV2Strategy extends Strategy with PredicateHelper {
+class DataSourceV2Strategy(session: SparkSession) extends Strategy with 
PredicateHelper {
 
   import DataSourceV2Implicits._
 
+  private def withProjectAndFilter(
+  project: Seq[NamedExpression],
+  filters: Seq[Expression],
+  scan: LeafExecNode,
+  needsUnsafeConversion: Boolean): SparkPlan = {
+val filterCondition = filters.reduceLeftOption(And)
+val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+if (withFilter.output != project || needsUnsafeConversion) {
+  ProjectExec(project, withFilter)
+} else {
+  withFilter
+}
+  }
+
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(project, filters,
+relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) =>
+  val pushedFilters = 
relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG)
+.getOrElse(Seq.empty)
+  val v1Relation = v1Scan.toV1TableScan(session.sqlContext)
+  if (v1Relation.schema != v1Scan.readSchema()) {
+throw new IllegalArgumentException(
+  "The fallback v1 relation reports inconsistent schema:\n" +
+"Schema of v2 scan: " + v1Scan.readSchema() + "\n" +
+"Schema of v1 relation: " + v1Relation.schema)
+  }
+  val rdd = v1Relation match {
+case s: TableScan => s.buildScan()
+case _ =>
+  throw new IllegalArgumentException(
+"`V1Scan.toV1Relation` must return a `TableScan` instance.")
+  }
+  val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, 
rdd)
+  val originalOutputNames = relation.table.schema().map(_.name)
+  val requiredColumnsIndex = 
output.map(_.name).map(originalOutputNames.indexOf)
+  val dsScan = RowDataSourceScanExec(
 
 Review comment:
   I tried and it's not easy to refactor. The query plans in Spark are all case 
class and you can't create a sub-class. And it's only a few lines of code that 
can be encapsulated, the part of getting the v1 scan should belong to the 
strategy.
   
   If you have some ideas, please help and send a PR to my branch. I'm happy to 
see approaches that can make the code cleaner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-12-24 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r361101534
 
 

 ##
 File path: 
sql/core/src/main/java/org/apache/spark/sql/connector/write/V1WriteBuilder.java
 ##
 @@ -44,11 +41,5 @@
*
* @since 3.0.0
*/
-  def buildForV1Write(): InsertableRelation
-
-  // These methods cannot be implemented by a V1WriteBuilder. The super class 
will throw
-  // an Unsupported OperationException
-  override final def buildForBatch(): BatchWrite = super.buildForBatch()
 
 Review comment:
   java interface can't have `final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-12-22 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r360763960
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ##
 @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, Strategy}
-import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, 
SubqueryExpression}
+import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, 
NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import 
org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, 
AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, 
DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, 
LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, 
RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, 
SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, 
ShowTableProperties, ShowTables}
 import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, 
TableCapability}
+import org.apache.spark.sql.connector.read.V1Scan
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, 
RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources.TableScan
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-object DataSourceV2Strategy extends Strategy with PredicateHelper {
+class DataSourceV2Strategy(session: SparkSession) extends Strategy with 
PredicateHelper {
 
   import DataSourceV2Implicits._
 
+  private def withProjectAndFilter(
+  project: Seq[NamedExpression],
+  filters: Seq[Expression],
+  scan: LeafExecNode,
+  needsUnsafeConversion: Boolean): SparkPlan = {
+val filterCondition = filters.reduceLeftOption(And)
+val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+if (withFilter.output != project || needsUnsafeConversion) {
+  ProjectExec(project, withFilter)
+} else {
+  withFilter
+}
+  }
+
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(project, filters,
+relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) =>
+  val pushedFilters = 
relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG)
+.getOrElse(Seq.empty)
+  val v1Relation = v1Scan.toV1TableScan(session.sqlContext)
+  if (v1Relation.schema != v1Scan.readSchema()) {
+throw new IllegalArgumentException(
+  "The fallback v1 relation reports inconsistent schema:\n" +
+"Schema of v2 scan: " + v1Scan.readSchema() + "\n" +
+"Schema of v1 relation: " + v1Relation.schema)
+  }
+  val rdd = v1Relation match {
+case s: TableScan => s.buildScan()
+case _ =>
+  throw new IllegalArgumentException(
+"`V1Scan.toV1Relation` must return a `TableScan` instance.")
+  }
+  val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, 
rdd)
+  val originalOutputNames = relation.table.schema().map(_.name)
+  val requiredColumnsIndex = 
output.map(_.name).map(originalOutputNames.indexOf)
+  val dsScan = RowDataSourceScanExec(
 
 Review comment:
   This may be a good idea, but this is what we do for ds v1 (see the v1 rule 
`DataSourceStrategy`). I'd like to avoid changing the existing design choice we 
made before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-12-22 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r360763763
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/connector/read/V1Scan.scala
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read
+
+import org.apache.spark.annotation.{Experimental, Unstable}
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.BaseRelation
+
+/**
+ * A trait that should be implemented by V1 DataSources that would like to 
leverage the DataSource
+ * V2 read code paths.
+ *
+ * This interface is designed to provide Spark DataSources time to migrate to 
DataSource V2 and
+ * will be removed in a future Spark release.
+ *
+ * @since 3.0.0
+ */
+@Experimental
+@Unstable
+trait V1Scan extends Scan {
+
+  /**
+   * Creates an `BaseRelation` that can scan data from DataSource v1 to 
RDD[Row]. The returned
+   * relation must be a `TableScan` instance.
 
 Review comment:
   I write it in Scala to follow `V1WriteBuilder`. It's another topic if we 
should write these fallback traits in Java or Scala, I follow the existing 
choice to be consistent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-12-22 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r360763577
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/connector/read/V1Scan.scala
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read
+
+import org.apache.spark.annotation.{Experimental, Unstable}
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.BaseRelation
+
+/**
+ * A trait that should be implemented by V1 DataSources that would like to 
leverage the DataSource
+ * V2 read code paths.
+ *
+ * This interface is designed to provide Spark DataSources time to migrate to 
DataSource V2 and
+ * will be removed in a future Spark release.
+ *
+ * @since 3.0.0
+ */
+@Experimental
+@Unstable
+trait V1Scan extends Scan {
+
+  /**
+   * Creates an `BaseRelation` that can scan data from DataSource v1 to 
RDD[Row]. The returned
+   * relation must be a `TableScan` instance.
 
 Review comment:
   `TableScan` is a mixin trait, what we need is `BaseRelation with TableScan`, 
but that doesn't work well for java.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-11-01 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r341575272
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 ##
 @@ -51,6 +54,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
  """.stripMargin)
 
   val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output)
+  scanRelation.setTagValue(PUSHED_FILTERS_TAG, pushedFilters)
 
 Review comment:
   It will be convenient if `Scan` can report pushed filters itself. But I'm 
not sure how to design the API to make it work.
   
   Here I just store the pushed filters in the  `DataSourceV2ScanRelation`, so 
that I can use it when creating v1 physical scan node later, which needs 
`pushedFilters` to do equality check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-11-01 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r341575272
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 ##
 @@ -51,6 +54,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
  """.stripMargin)
 
   val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output)
+  scanRelation.setTagValue(PUSHED_FILTERS_TAG, pushedFilters)
 
 Review comment:
   It will be convenient if `Scan` can report pushed filters itself. But I 
don't know how to design the API to make it work.
   
   Here I just store the pushed filters in the  `DataSourceV2ScanRelation`, so 
that I can use it when creating v1 physical scan node later, which needs 
`pushedFilters` to do equality check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2

2019-10-23 Thread GitBox
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add 
v1 read fallback API in DS v2
URL: https://github.com/apache/spark/pull/26231#discussion_r338106338
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 ##
 @@ -97,17 +97,14 @@ trait DataSourceScanExec extends LeafExecNode {
 
 /** Physical plan node for scanning data from a relation. */
 case class RowDataSourceScanExec(
-fullOutput: Seq[Attribute],
-requiredColumnsIndex: Seq[Int],
+output: Seq[Attribute],
 
 Review comment:
   Since I need to use `RowDataSourceScanExec` in the new read fallback code 
path, simplify it a little bit to make it easier to use.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org