[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r367291651 ## File path: sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession, SQLContext} +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, Table, TableCapability, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns, V1Scan} +import org.apache.spark.sql.execution.RowDataSourceScanExec +import org.apache.spark.sql.sources.{BaseRelation, Filter, GreaterThan, TableScan} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +abstract class V1ReadFallbackSuite extends QueryTest with SharedSparkSession { + protected def baseTableScan(): DataFrame + + test("full scan") { +val df = baseTableScan() +val v1Scan = df.queryExecution.executedPlan.collect { + case s: RowDataSourceScanExec => s +} +assert(v1Scan.length == 1) +checkAnswer(df, Seq(Row(1, 10), Row(2, 20), Row(3, 30))) + } + + test("column pruning") { +val df = baseTableScan().select("i") +val v1Scan = df.queryExecution.executedPlan.collect { + case s: RowDataSourceScanExec => s +} +assert(v1Scan.length == 1) +assert(v1Scan.head.output.map(_.name) == Seq("i")) +checkAnswer(df, Seq(Row(1), Row(2), Row(3))) + } + + test("filter push down") { +val df = baseTableScan().filter("i > 1 and j < 30") +val v1Scan = df.queryExecution.executedPlan.collect { + case s: RowDataSourceScanExec => s +} +assert(v1Scan.length == 1) +// `j < 30` can't be pushed. +assert(v1Scan.head.handledFilters.size == 1) +checkAnswer(df, Seq(Row(2, 20))) + } + + test("filter push down + column pruning") { +val df = baseTableScan().filter("i > 1").select("i") +val v1Scan = df.queryExecution.executedPlan.collect { + case s: RowDataSourceScanExec => s +} +assert(v1Scan.length == 1) +assert(v1Scan.head.output.map(_.name) == Seq("i")) +assert(v1Scan.head.handledFilters.size == 1) +checkAnswer(df, Seq(Row(2), Row(3))) + } +} + +class V1ReadFallbackWithDataFrameReaderSuite extends V1ReadFallbackSuite { + override protected def baseTableScan(): DataFrame = { +spark.read.format(classOf[V1ReadFallbackTableProvider].getName).load() + } +} + +class V1ReadFallbackWithCatalogSuite extends V1ReadFallbackSuite { + override def beforeAll(): Unit = { +super.beforeAll() +spark.conf.set("spark.sql.catalog.read_fallback", classOf[V1ReadFallbackCatalog].getName) +sql("CREATE TABLE read_fallback.tbl(i int, j int) USING foo") + } + + override def afterAll(): Unit = { +spark.conf.unset("spark.sql.catalog.read_fallback") +super.afterAll() + } + + override protected def baseTableScan(): DataFrame = { +spark.table("read_fallback.tbl") + } +} + +class V1ReadFallbackCatalog extends BasicInMemoryTableCatalog { + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { +// To simplify the test implementation, only support fixed schema. +if (schema != V1ReadFallbackCatalog.schema || partitions.nonEmpty) { + throw new UnsupportedOperationException +} +val table = new TableWithV1ReadFallback(ident.toString) +tables.put(ident, table) +table + } +} + +object V1ReadFallbackCatalog { + val schema = new StructType().add("i", "int").add("j", "int") +} + +class V1ReadFallbackTableProvider extends TableProvider { + override def getTable(options: CaseInsensitiveStringMap): Table = { +new TableWithV1Rea
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r367283434 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ## @@ -19,40 +19,72 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Strategy} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable} -import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetLocation, AlterNamespaceSetOwner, AlterNamespaceSetProperties, AlterTable, AppendData, CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} -import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.read.V1Scan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.util.CaseInsensitiveStringMap -object DataSourceV2Strategy extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { +val filterCondition = filters.reduceLeftOption(And) +val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + +if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) +} else { + withFilter +} + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(project, filters, +relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) => + val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG) +.getOrElse(Seq.empty) + val v1Relation = v1Scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext) + if (v1Relation.schema != v1Scan.readSchema()) { +throw new IllegalArgumentException( + "The fallback v1 relation reports inconsistent schema:\n" + +"Schema of v2 scan: " + v1Scan.readSchema() + "\n" + +"Schema of v1 relation: " + v1Relation.schema) + } + val rdd = v1Relation.buildScan() + val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) + val originalOutputNames = relation.table.schema().map(_.name) + val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) + val dsScan = RowDataSourceScanExec( +output, +requiredColumnsIndex, +pushedFilters.toSet, Review comment: After some more thoughts, I think we should only track the pushed filters. In data source v1, what we track is all the filters Spark pushes down, and the ones accepted by the source, which is calculated by `all filters -- post-scan filters`. This is actually imprecise. For example, parquet source can leverage the data filters but still need Spark to evaluate the filter again, so they are post-scan filters and won't be treated as "accepted filters". In data source v2, we fix it by asking the source to report the pushed filters (via `SupportsPushDownFiters.pushedFilters`), and we only report the pushed filters in plan st
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r367283434 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ## @@ -19,40 +19,72 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Strategy} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable} -import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetLocation, AlterNamespaceSetOwner, AlterNamespaceSetProperties, AlterTable, AppendData, CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} -import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.read.V1Scan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.util.CaseInsensitiveStringMap -object DataSourceV2Strategy extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { +val filterCondition = filters.reduceLeftOption(And) +val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + +if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) +} else { + withFilter +} + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(project, filters, +relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) => + val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG) +.getOrElse(Seq.empty) + val v1Relation = v1Scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext) + if (v1Relation.schema != v1Scan.readSchema()) { +throw new IllegalArgumentException( + "The fallback v1 relation reports inconsistent schema:\n" + +"Schema of v2 scan: " + v1Scan.readSchema() + "\n" + +"Schema of v1 relation: " + v1Relation.schema) + } + val rdd = v1Relation.buildScan() + val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) + val originalOutputNames = relation.table.schema().map(_.name) + val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) + val dsScan = RowDataSourceScanExec( +output, +requiredColumnsIndex, +pushedFilters.toSet, Review comment: After some more thoughts, I think we should only track the pushed filters. In data source v1, what we track is all the filters Spark pushes down, and the ones accepted by the source, which is calculated by `all filters -- post-scan filters`. This is actually imprecise. For example, parquet source can leverage the data filters but still need Spark to evaluate the filter again, so they are post-scan filters and won't be treated as "accepted filters". In data source v2, we fix it by asking the source to report the pushed filters (via `SupportsPushDownFiters.pushedFilters`), and we only report the pushed filters in plan st
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r367230956 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ## @@ -19,40 +19,72 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Strategy} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable} -import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetLocation, AlterNamespaceSetOwner, AlterNamespaceSetProperties, AlterTable, AppendData, CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} -import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.read.V1Scan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.util.CaseInsensitiveStringMap -object DataSourceV2Strategy extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { +val filterCondition = filters.reduceLeftOption(And) +val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + +if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) +} else { + withFilter +} + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(project, filters, +relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) => + val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG) +.getOrElse(Seq.empty) + val v1Relation = v1Scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext) + if (v1Relation.schema != v1Scan.readSchema()) { +throw new IllegalArgumentException( + "The fallback v1 relation reports inconsistent schema:\n" + +"Schema of v2 scan: " + v1Scan.readSchema() + "\n" + +"Schema of v1 relation: " + v1Relation.schema) + } + val rdd = v1Relation.buildScan() + val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) + val originalOutputNames = relation.table.schema().map(_.name) + val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) + val dsScan = RowDataSourceScanExec( +output, +requiredColumnsIndex, +pushedFilters.toSet, Review comment: Yes it should be the entire set of filters, but it's not a big deal. `RowDataSourceScanExec.filters` is only used in `toString`, to let people know which filters are pushed but not accepted by the source. Anyway we can retain the full filter set like the pushed filters, I'll fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Servic
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r365645744 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ## @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} +import org.apache.spark.sql.connector.read.V1Scan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.TableScan import org.apache.spark.sql.util.CaseInsensitiveStringMap -object DataSourceV2Strategy extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { +val filterCondition = filters.reduceLeftOption(And) +val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + +if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) +} else { + withFilter +} + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(project, filters, +relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) => + val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG) +.getOrElse(Seq.empty) + val v1Relation = v1Scan.toV1TableScan(session.sqlContext) + if (v1Relation.schema != v1Scan.readSchema()) { +throw new IllegalArgumentException( + "The fallback v1 relation reports inconsistent schema:\n" + +"Schema of v2 scan: " + v1Scan.readSchema() + "\n" + +"Schema of v1 relation: " + v1Relation.schema) + } + val rdd = v1Relation match { +case s: TableScan => s.buildScan() +case _ => + throw new IllegalArgumentException( +"`V1Scan.toV1Relation` must return a `TableScan` instance.") + } + val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) + val originalOutputNames = relation.table.schema().map(_.name) + val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) + val dsScan = RowDataSourceScanExec( Review comment: The Scala auxiliary constructors must call other constructors as the FIRST action, e.g. ``` def this(...) { do something this(...) } ``` is not allowed. We can create an object and add an `apply` method to encapsulate the 4 lines of code, but does it really matter? The 4 lines of code only appear once here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r365563231 ## File path: sql/core/src/main/java/org/apache/spark/sql/connector/read/V1Scan.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Unstable; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.sources.BaseRelation; +import org.apache.spark.sql.sources.TableScan; + +/** + * A trait that should be implemented by V1 DataSources that would like to leverage the DataSource + * V2 read code paths. + * + * This interface is designed to provide Spark DataSources time to migrate to DataSource V2 and + * will be removed in a future Spark release. + * + * @since 3.0.0 + */ +@Unstable +public interface V1Scan extends Scan { + + /** + * Creates an `BaseRelation` with `TableScan` that can scan data from DataSource v1 to RDD[Row]. + * + * @since 3.0.0 + */ + T toV1TableScan(SQLContext context); Review comment: We haven't marked `SQLContext` as deprecated yet. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r365563206 ## File path: sql/core/src/main/java/org/apache/spark/sql/connector/read/V1Scan.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Unstable; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.sources.BaseRelation; +import org.apache.spark.sql.sources.TableScan; + +/** + * A trait that should be implemented by V1 DataSources that would like to leverage the DataSource + * V2 read code paths. + * + * This interface is designed to provide Spark DataSources time to migrate to DataSource V2 and + * will be removed in a future Spark release. + * + * @since 3.0.0 + */ +@Unstable +public interface V1Scan extends Scan { Review comment: The idea is the same with v1 write fallback API. The v1 write fallback API also relies on the v2 API to config the write. It's better to leverage the v2 infra as much as we can. e.g. we may improve the v2 pushdown to push more operators that v1 doesn't support. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r361102981 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ## @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} +import org.apache.spark.sql.connector.read.V1Scan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.TableScan import org.apache.spark.sql.util.CaseInsensitiveStringMap -object DataSourceV2Strategy extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { +val filterCondition = filters.reduceLeftOption(And) +val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + +if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) +} else { + withFilter +} + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(project, filters, +relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) => + val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG) +.getOrElse(Seq.empty) + val v1Relation = v1Scan.toV1TableScan(session.sqlContext) + if (v1Relation.schema != v1Scan.readSchema()) { +throw new IllegalArgumentException( + "The fallback v1 relation reports inconsistent schema:\n" + +"Schema of v2 scan: " + v1Scan.readSchema() + "\n" + +"Schema of v1 relation: " + v1Relation.schema) + } + val rdd = v1Relation match { +case s: TableScan => s.buildScan() +case _ => + throw new IllegalArgumentException( +"`V1Scan.toV1Relation` must return a `TableScan` instance.") + } + val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) + val originalOutputNames = relation.table.schema().map(_.name) + val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) + val dsScan = RowDataSourceScanExec( Review comment: I tried and it's not easy to refactor. The query plans in Spark are all case class and you can't create a sub-class of case class. And it's only a few lines of code that can be encapsulated, the part of getting the v1 scan should belong to the strategy. If you have some ideas, please help and send a PR to my branch. I'm happy to see approaches that can make the code cleaner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r361102981 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ## @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} +import org.apache.spark.sql.connector.read.V1Scan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.TableScan import org.apache.spark.sql.util.CaseInsensitiveStringMap -object DataSourceV2Strategy extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { +val filterCondition = filters.reduceLeftOption(And) +val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + +if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) +} else { + withFilter +} + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(project, filters, +relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) => + val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG) +.getOrElse(Seq.empty) + val v1Relation = v1Scan.toV1TableScan(session.sqlContext) + if (v1Relation.schema != v1Scan.readSchema()) { +throw new IllegalArgumentException( + "The fallback v1 relation reports inconsistent schema:\n" + +"Schema of v2 scan: " + v1Scan.readSchema() + "\n" + +"Schema of v1 relation: " + v1Relation.schema) + } + val rdd = v1Relation match { +case s: TableScan => s.buildScan() +case _ => + throw new IllegalArgumentException( +"`V1Scan.toV1Relation` must return a `TableScan` instance.") + } + val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) + val originalOutputNames = relation.table.schema().map(_.name) + val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) + val dsScan = RowDataSourceScanExec( Review comment: I tried and it's not easy to refactor. The query plans in Spark are all case class and you can't create a sub-class. And it's only a few lines of code that can be encapsulated, the part of getting the v1 scan should belong to the strategy. If you have some ideas, please help and send a PR to my branch. I'm happy to see approaches that can make the code cleaner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r361101534 ## File path: sql/core/src/main/java/org/apache/spark/sql/connector/write/V1WriteBuilder.java ## @@ -44,11 +41,5 @@ * * @since 3.0.0 */ - def buildForV1Write(): InsertableRelation - - // These methods cannot be implemented by a V1WriteBuilder. The super class will throw - // an Unsupported OperationException - override final def buildForBatch(): BatchWrite = super.buildForBatch() Review comment: java interface can't have `final` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r360763960 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ## @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} +import org.apache.spark.sql.connector.read.V1Scan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.TableScan import org.apache.spark.sql.util.CaseInsensitiveStringMap -object DataSourceV2Strategy extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { +val filterCondition = filters.reduceLeftOption(And) +val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + +if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) +} else { + withFilter +} + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(project, filters, +relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) => + val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG) +.getOrElse(Seq.empty) + val v1Relation = v1Scan.toV1TableScan(session.sqlContext) + if (v1Relation.schema != v1Scan.readSchema()) { +throw new IllegalArgumentException( + "The fallback v1 relation reports inconsistent schema:\n" + +"Schema of v2 scan: " + v1Scan.readSchema() + "\n" + +"Schema of v1 relation: " + v1Relation.schema) + } + val rdd = v1Relation match { +case s: TableScan => s.buildScan() +case _ => + throw new IllegalArgumentException( +"`V1Scan.toV1Relation` must return a `TableScan` instance.") + } + val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) + val originalOutputNames = relation.table.schema().map(_.name) + val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) + val dsScan = RowDataSourceScanExec( Review comment: This may be a good idea, but this is what we do for ds v1 (see the v1 rule `DataSourceStrategy`). I'd like to avoid changing the existing design choice we made before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r360763763 ## File path: sql/core/src/main/scala/org/apache/spark/sql/connector/read/V1Scan.scala ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read + +import org.apache.spark.annotation.{Experimental, Unstable} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.BaseRelation + +/** + * A trait that should be implemented by V1 DataSources that would like to leverage the DataSource + * V2 read code paths. + * + * This interface is designed to provide Spark DataSources time to migrate to DataSource V2 and + * will be removed in a future Spark release. + * + * @since 3.0.0 + */ +@Experimental +@Unstable +trait V1Scan extends Scan { + + /** + * Creates an `BaseRelation` that can scan data from DataSource v1 to RDD[Row]. The returned + * relation must be a `TableScan` instance. Review comment: I write it in Scala to follow `V1WriteBuilder`. It's another topic if we should write these fallback traits in Java or Scala, I follow the existing choice to be consistent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r360763577 ## File path: sql/core/src/main/scala/org/apache/spark/sql/connector/read/V1Scan.scala ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read + +import org.apache.spark.annotation.{Experimental, Unstable} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.BaseRelation + +/** + * A trait that should be implemented by V1 DataSources that would like to leverage the DataSource + * V2 read code paths. + * + * This interface is designed to provide Spark DataSources time to migrate to DataSource V2 and + * will be removed in a future Spark release. + * + * @since 3.0.0 + */ +@Experimental +@Unstable +trait V1Scan extends Scan { + + /** + * Creates an `BaseRelation` that can scan data from DataSource v1 to RDD[Row]. The returned + * relation must be a `TableScan` instance. Review comment: `TableScan` is a mixin trait, what we need is `BaseRelation with TableScan`, but that doesn't work well for java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r341575272 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ## @@ -51,6 +54,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { """.stripMargin) val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output) + scanRelation.setTagValue(PUSHED_FILTERS_TAG, pushedFilters) Review comment: It will be convenient if `Scan` can report pushed filters itself. But I'm not sure how to design the API to make it work. Here I just store the pushed filters in the `DataSourceV2ScanRelation`, so that I can use it when creating v1 physical scan node later, which needs `pushedFilters` to do equality check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r341575272 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala ## @@ -51,6 +54,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { """.stripMargin) val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output) + scanRelation.setTagValue(PUSHED_FILTERS_TAG, pushedFilters) Review comment: It will be convenient if `Scan` can report pushed filters itself. But I don't know how to design the API to make it work. Here I just store the pushed filters in the `DataSourceV2ScanRelation`, so that I can use it when creating v1 physical scan node later, which needs `pushedFilters` to do equality check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r338106338 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ## @@ -97,17 +97,14 @@ trait DataSourceScanExec extends LeafExecNode { /** Physical plan node for scanning data from a relation. */ case class RowDataSourceScanExec( -fullOutput: Seq[Attribute], -requiredColumnsIndex: Seq[Int], +output: Seq[Attribute], Review comment: Since I need to use `RowDataSourceScanExec` in the new read fallback code path, simplify it a little bit to make it easier to use. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org