[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r204264532 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1191,6 +1191,20 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(100) + val DISABLED_V2_FILE_DATA_SOURCE_READERS = buildConf("spark.sql.disabledV2FileDataSourceReaders") --- End diff -- Hm are we going to leave this out for 2.4, and target to completely migrate in 3.0? Do you guys also plan to migrate them within 2.4? Looks there's not much gain by doing this change alone when the release is being close. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r185215070 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -213,6 +215,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } } +/** + * Replace the V2 data source of table in [[InsertIntoTable]] to V1 [[FileFormat]]. + * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails + * since there is no correspoding physical plan. + * This is a temporary hack for making current data source V2 work. --- End diff -- This is definitely temporary. We have to add this, otherwise we can't migrate file data sources to V2. I believe @cloud-fan have a plan for getting rid of this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r185055954 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -213,6 +215,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } } +/** + * Replace the V2 data source of table in [[InsertIntoTable]] to V1 [[FileFormat]]. + * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails + * since there is no correspoding physical plan. + * This is a temporary hack for making current data source V2 work. --- End diff -- I don't like the idea of "temporary hack" rules. What is the long-term plan for getting rid of this? Or is this something we will end up supporting forever? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183611332 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { + class DummyFileWriter extends DataSourceWriter { +override def createWriterFactory(): DataWriterFactory[Row] = { + throw new AnalysisException("hehe") +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = {} + +override def abort(messages: Array[WriterCommitMessage]): Unit = {} + } + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + private val simpleFileDataSourceV2 = classOf[SimpleFileDataSourceV2].getName + private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { +val df = spark.range(1, 10).toDF() +withTempPath { file => + val path = file.getCanonicalPath + // Writing file should fall back to v1 and succeed. + df.write.format(dummyParquetReaderV2).save(path) + + // Validate write result with [[ParquetFileFormat]]. + checkAnswer(spark.read.format(parquetV1).load(path), df) + + // Dummy File reader should fail as expected. + val exception = intercept[AnalysisException] { +spark.read.format(dummyParquetReaderV2).load(path) + } + assert(exception.message.equals("Dummy file reader")) +} + } + + test("Fall back
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183611071 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { + class DummyFileWriter extends DataSourceWriter { +override def createWriterFactory(): DataWriterFactory[Row] = { + throw new AnalysisException("hehe") +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = {} + +override def abort(messages: Array[WriterCommitMessage]): Unit = {} + } + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + private val simpleFileDataSourceV2 = classOf[SimpleFileDataSourceV2].getName + private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { +val df = spark.range(1, 10).toDF() +withTempPath { file => + val path = file.getCanonicalPath + // Writing file should fall back to v1 and succeed. + df.write.format(dummyParquetReaderV2).save(path) + + // Validate write result with [[ParquetFileFormat]]. + checkAnswer(spark.read.format(parquetV1).load(path), df) + + // Dummy File reader should fail as expected. + val exception = intercept[AnalysisException] { +spark.read.format(dummyParquetReaderV2).load(path) + } + assert(exception.message.equals("Dummy file reader")) +} + } + + test("Fall back
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183589976 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { + class DummyFileWriter extends DataSourceWriter { +override def createWriterFactory(): DataWriterFactory[Row] = { + throw new AnalysisException("hehe") +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = {} + +override def abort(messages: Array[WriterCommitMessage]): Unit = {} + } + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + private val simpleFileDataSourceV2 = classOf[SimpleFileDataSourceV2].getName + private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { +val df = spark.range(1, 10).toDF() +withTempPath { file => + val path = file.getCanonicalPath + // Writing file should fall back to v1 and succeed. + df.write.format(dummyParquetReaderV2).save(path) + + // Validate write result with [[ParquetFileFormat]]. + checkAnswer(spark.read.format(parquetV1).load(path), df) + + // Dummy File reader should fail as expected. + val exception = intercept[AnalysisException] { +spark.read.format(dummyParquetReaderV2).load(path) + } + assert(exception.message.equals("Dummy file reader")) +} + } + + test("Fall back to
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183589841 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { + class DummyFileWriter extends DataSourceWriter { +override def createWriterFactory(): DataWriterFactory[Row] = { + throw new AnalysisException("hehe") +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = {} + +override def abort(messages: Array[WriterCommitMessage]): Unit = {} + } + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + private val simpleFileDataSourceV2 = classOf[SimpleFileDataSourceV2].getName + private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { +val df = spark.range(1, 10).toDF() +withTempPath { file => + val path = file.getCanonicalPath + // Writing file should fall back to v1 and succeed. + df.write.format(dummyParquetReaderV2).save(path) + + // Validate write result with [[ParquetFileFormat]]. + checkAnswer(spark.read.format(parquetV1).load(path), df) + + // Dummy File reader should fail as expected. + val exception = intercept[AnalysisException] { +spark.read.format(dummyParquetReaderV2).load(path) + } + assert(exception.message.equals("Dummy file reader")) +} + } + + test("Fall back to
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183589906 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { + class DummyFileWriter extends DataSourceWriter { +override def createWriterFactory(): DataWriterFactory[Row] = { + throw new AnalysisException("hehe") +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = {} + +override def abort(messages: Array[WriterCommitMessage]): Unit = {} + } + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + private val simpleFileDataSourceV2 = classOf[SimpleFileDataSourceV2].getName + private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { +val df = spark.range(1, 10).toDF() +withTempPath { file => + val path = file.getCanonicalPath + // Writing file should fall back to v1 and succeed. + df.write.format(dummyParquetReaderV2).save(path) + + // Validate write result with [[ParquetFileFormat]]. + checkAnswer(spark.read.format(parquetV1).load(path), df) + + // Dummy File reader should fail as expected. + val exception = intercept[AnalysisException] { +spark.read.format(dummyParquetReaderV2).load(path) + } + assert(exception.message.equals("Dummy file reader")) +} + } + + test("Fall back to
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183589673 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { + class DummyFileWriter extends DataSourceWriter { +override def createWriterFactory(): DataWriterFactory[Row] = { + throw new AnalysisException("hehe") +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = {} + +override def abort(messages: Array[WriterCommitMessage]): Unit = {} + } + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + private val simpleFileDataSourceV2 = classOf[SimpleFileDataSourceV2].getName + private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { +val df = spark.range(1, 10).toDF() +withTempPath { file => + val path = file.getCanonicalPath + // Writing file should fall back to v1 and succeed. + df.write.format(dummyParquetReaderV2).save(path) + + // Validate write result with [[ParquetFileFormat]]. + checkAnswer(spark.read.format(parquetV1).load(path), df) + + // Dummy File reader should fail as expected. + val exception = intercept[AnalysisException] { +spark.read.format(dummyParquetReaderV2).load(path) + } + assert(exception.message.equals("Dummy file reader")) +} + } + + test("Fall back to
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183589528 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { + class DummyFileWriter extends DataSourceWriter { +override def createWriterFactory(): DataWriterFactory[Row] = { + throw new AnalysisException("hehe") +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = {} + +override def abort(messages: Array[WriterCommitMessage]): Unit = {} + } + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + private val simpleFileDataSourceV2 = classOf[SimpleFileDataSourceV2].getName + private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { +val df = spark.range(1, 10).toDF() +withTempPath { file => + val path = file.getCanonicalPath + // Writing file should fall back to v1 and succeed. + df.write.format(dummyParquetReaderV2).save(path) + + // Validate write result with [[ParquetFileFormat]]. + checkAnswer(spark.read.format(parquetV1).load(path), df) + + // Dummy File reader should fail as expected. --- End diff -- `Dummy File reader should be picked and fail as expected` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands,
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183589427 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { + class DummyFileWriter extends DataSourceWriter { +override def createWriterFactory(): DataWriterFactory[Row] = { + throw new AnalysisException("hehe") +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = {} + +override def abort(messages: Array[WriterCommitMessage]): Unit = {} + } + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName + private val simpleFileDataSourceV2 = classOf[SimpleFileDataSourceV2].getName + private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { +val df = spark.range(1, 10).toDF() --- End diff -- nit: `val df = spark.range(10)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183589226 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import java.util.{List => JList, Optional} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with ReadSupport { + class DummyFileReader extends DataSourceReader { +override def readSchema(): StructType = { + throw new AnalysisException("hehe") +} + +override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = + java.util.Arrays.asList() + } + + override def createReader(options: DataSourceOptions): DataSourceReader = { +throw new AnalysisException("Dummy file reader") + } + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with WriteSupport { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def createWriter( + jobId: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): Optional[DataSourceWriter] = { +throw new AnalysisException("Dummy file writer") + } +} + +class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with FileDataSourceV2 { + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" +} + +class FileDataSourceV2Suite extends QueryTest with ParquetTest with SharedSQLContext { --- End diff -- `FileDataSourceV2FallbackSuite ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183589146 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala --- @@ -158,6 +158,7 @@ abstract class BaseSessionStateBuilder( override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: +new FallBackFileDataSourceToV1(session) +: --- End diff -- We also need to add this rule to `HiveSessionStateBuilder.analyzer` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183398030 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala --- @@ -27,15 +27,13 @@ import org.apache.spark.sql.sources.v2.WriteSupport */ trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister { /** - * Returns an optional V1 [[FileFormat]] class of the same file data source. + * Returns n V1 [[FileFormat]] class of the same file data source. --- End diff -- n -> a --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183318946 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.ReadSupport +import org.apache.spark.sql.sources.v2.WriteSupport + +/** + * The base class for file data source v2. Implementations must have a public, 0-arg constructor. + * + * Note that this is an empty interface. Data source implementations should mix-in at least one of + * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just + * a dummy data source which is un-readable/writable. + */ +trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister { + /** + * Returns an optional V1 [[FileFormat]] class of the same file data source. + * This is a solution for the following cases: + * 1. File datasource V2 might be implemented partially during migration. + *E.g. if [[ReadSupport]] is implemented while [[WriteSupport]] is not, + *write path should fall back to V1 implementation. + * 2. File datasource V2 implementations cause regression. + * 3. Catalog support is required, which is still under development for data source V2. + */ + def fallBackFileFormat: Option[Class[_]] = None --- End diff -- Comments are added to explain this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183318856 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -89,8 +91,13 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) - lazy val providingClass: Class[_] = -DataSource.lookupDataSource(className, sparkSession.sessionState.conf) + lazy val providingClass: Class[_] = { +val cls = DataSource.lookupDataSource(className, sparkSession.sessionState.conf) +cls.newInstance() match { + case f: FileDataSourceV2 => f.fallBackFileFormat.getOrElse(cls) --- End diff -- Comments are added to explain this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183318881 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -213,6 +215,25 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } } +/** + * Replaces [[FileDataSourceV2]] with [[DataSource]] if parent node is [[InsertIntoTable]]. + */ +class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan] { --- End diff -- Comments are added to explain this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.ReadSupport +import org.apache.spark.sql.sources.v2.WriteSupport + +/** + * The base class for file data source v2. Implementations must have a public, 0-arg constructor. + * + * Note that this is an empty interface. Data source implementations should mix-in at least one of + * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just + * a dummy data source which is un-readable/writable. + */ +trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister { + /** + * Returns an optional V1 [[FileFormat]] class of the same file data source. + * This is a solution for the following cases: + * 1. File datasource V2 might be implemented partially during migration. + *E.g. if [[ReadSupport]] is implemented while [[WriteSupport]] is not, + *write path should fall back to V1 implementation. + * 2. File datasource V2 implementations cause regression. + * 3. Catalog support is required, which is still under development for data source V2. + */ + def fallBackFileFormat: Option[Class[_]] = None --- End diff -- why it's optional? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274704 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.ReadSupport +import org.apache.spark.sql.sources.v2.WriteSupport + +/** + * The base class for file data source v2. Implementations must have a public, 0-arg constructor. + * + * Note that this is an empty interface. Data source implementations should mix-in at least one of + * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just + * a dummy data source which is un-readable/writable. --- End diff -- We won't need to copy the javadoc for the parent class. Just say `A base interface for data source v2 implementations of the built-in file-based data sources.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -213,6 +215,25 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } } +/** + * Replaces [[FileDataSourceV2]] with [[DataSource]] if parent node is [[InsertIntoTable]]. + */ +class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan] { --- End diff -- Need a little more comments about when this can happen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274501 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -89,8 +91,13 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) - lazy val providingClass: Class[_] = -DataSource.lookupDataSource(className, sparkSession.sessionState.conf) + lazy val providingClass: Class[_] = { +val cls = DataSource.lookupDataSource(className, sparkSession.sessionState.conf) +cls.newInstance() match { + case f: FileDataSourceV2 => f.fallBackFileFormat.getOrElse(cls) --- End diff -- why do we need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL}Create base class for file data...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/21123 [SPARK-24045][SQL}Create base class for file data source v2 ## What changes were proposed in this pull request? From #20933, we can see that during File data source V2 migration, falling back to V1 implementation is necessary in following cases: 1. File datasource V2 might be implemented partially during migration. E.g. if `ReadSupport` is implemented while `WriteSupport` is not, write path should fall back to V1 implementation. 2. File datasource V2 implementations cause regression. 3. Catalog support is required, which is still under development for data source V2. This pr is to create a base class FileDataSourceV2 and resolve the falling back problems. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark FileDataSourceV2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21123.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21123 commit 95628e5a027d029be7dcc4e8e979555bc5e0e4a3 Author: Gengliang WangDate: 2018-04-22T13:41:28Z add FileDataSourceV2 and fall back mechanism --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org