[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-07-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r204264532
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1191,6 +1191,20 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(100)
 
+  val DISABLED_V2_FILE_DATA_SOURCE_READERS = 
buildConf("spark.sql.disabledV2FileDataSourceReaders")
--- End diff --

Hm are we going to leave this out for 2.4, and target to completely 
migrate in 3.0?  Do you guys also plan to migrate them within 2.4? Looks 
there's not much gain by doing this change alone when the release is being 
close.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-05-01 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r185215070
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -213,6 +215,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
   }
 }
 
+/**
+ * Replace the V2 data source of table in [[InsertIntoTable]] to V1 
[[FileFormat]].
+ * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into 
 view `t` fails
+ * since there is no correspoding physical plan.
+ * This is a temporary hack for making current data source V2 work.
--- End diff --

This is definitely temporary. We have to add this, otherwise we can't 
migrate file data sources to V2.
I believe @cloud-fan have a plan for getting rid of this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r185055954
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -213,6 +215,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
   }
 }
 
+/**
+ * Replace the V2 data source of table in [[InsertIntoTable]] to V1 
[[FileFormat]].
+ * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into 
 view `t` fails
+ * since there is no correspoding physical plan.
+ * This is a temporary hack for making current data source V2 work.
--- End diff --

I don't like the idea of "temporary hack" rules. What is the long-term plan 
for getting rid of this? Or is this something we will end up supporting forever?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183611332
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
+  class DummyFileWriter extends DataSourceWriter {
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  throw new AnalysisException("hehe")
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private val dummyParquetReaderV2 = 
classOf[DummyReadOnlyFileDataSourceV2].getName
+  private val dummyParquetWriterV2 = 
classOf[DummyWriteOnlyFileDataSourceV2].getName
+  private val simpleFileDataSourceV2 = 
classOf[SimpleFileDataSourceV2].getName
+  private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName
+
+  test("Fall back to v1 when writing to file with read only 
FileDataSourceV2") {
+val df = spark.range(1, 10).toDF()
+withTempPath { file =>
+  val path = file.getCanonicalPath
+  // Writing file should fall back to v1 and succeed.
+  df.write.format(dummyParquetReaderV2).save(path)
+
+  // Validate write result with [[ParquetFileFormat]].
+  checkAnswer(spark.read.format(parquetV1).load(path), df)
+
+  // Dummy File reader should fail as expected.
+  val exception = intercept[AnalysisException] {
+spark.read.format(dummyParquetReaderV2).load(path)
+  }
+  assert(exception.message.equals("Dummy file reader"))
+}
+  }
+
+  test("Fall back 

[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183611071
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
+  class DummyFileWriter extends DataSourceWriter {
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  throw new AnalysisException("hehe")
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private val dummyParquetReaderV2 = 
classOf[DummyReadOnlyFileDataSourceV2].getName
+  private val dummyParquetWriterV2 = 
classOf[DummyWriteOnlyFileDataSourceV2].getName
+  private val simpleFileDataSourceV2 = 
classOf[SimpleFileDataSourceV2].getName
+  private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName
+
+  test("Fall back to v1 when writing to file with read only 
FileDataSourceV2") {
+val df = spark.range(1, 10).toDF()
+withTempPath { file =>
+  val path = file.getCanonicalPath
+  // Writing file should fall back to v1 and succeed.
+  df.write.format(dummyParquetReaderV2).save(path)
+
+  // Validate write result with [[ParquetFileFormat]].
+  checkAnswer(spark.read.format(parquetV1).load(path), df)
+
+  // Dummy File reader should fail as expected.
+  val exception = intercept[AnalysisException] {
+spark.read.format(dummyParquetReaderV2).load(path)
+  }
+  assert(exception.message.equals("Dummy file reader"))
+}
+  }
+
+  test("Fall back 

[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183589976
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
+  class DummyFileWriter extends DataSourceWriter {
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  throw new AnalysisException("hehe")
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private val dummyParquetReaderV2 = 
classOf[DummyReadOnlyFileDataSourceV2].getName
+  private val dummyParquetWriterV2 = 
classOf[DummyWriteOnlyFileDataSourceV2].getName
+  private val simpleFileDataSourceV2 = 
classOf[SimpleFileDataSourceV2].getName
+  private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName
+
+  test("Fall back to v1 when writing to file with read only 
FileDataSourceV2") {
+val df = spark.range(1, 10).toDF()
+withTempPath { file =>
+  val path = file.getCanonicalPath
+  // Writing file should fall back to v1 and succeed.
+  df.write.format(dummyParquetReaderV2).save(path)
+
+  // Validate write result with [[ParquetFileFormat]].
+  checkAnswer(spark.read.format(parquetV1).load(path), df)
+
+  // Dummy File reader should fail as expected.
+  val exception = intercept[AnalysisException] {
+spark.read.format(dummyParquetReaderV2).load(path)
+  }
+  assert(exception.message.equals("Dummy file reader"))
+}
+  }
+
+  test("Fall back to 

[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183589841
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
+  class DummyFileWriter extends DataSourceWriter {
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  throw new AnalysisException("hehe")
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private val dummyParquetReaderV2 = 
classOf[DummyReadOnlyFileDataSourceV2].getName
+  private val dummyParquetWriterV2 = 
classOf[DummyWriteOnlyFileDataSourceV2].getName
+  private val simpleFileDataSourceV2 = 
classOf[SimpleFileDataSourceV2].getName
+  private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName
+
+  test("Fall back to v1 when writing to file with read only 
FileDataSourceV2") {
+val df = spark.range(1, 10).toDF()
+withTempPath { file =>
+  val path = file.getCanonicalPath
+  // Writing file should fall back to v1 and succeed.
+  df.write.format(dummyParquetReaderV2).save(path)
+
+  // Validate write result with [[ParquetFileFormat]].
+  checkAnswer(spark.read.format(parquetV1).load(path), df)
+
+  // Dummy File reader should fail as expected.
+  val exception = intercept[AnalysisException] {
+spark.read.format(dummyParquetReaderV2).load(path)
+  }
+  assert(exception.message.equals("Dummy file reader"))
+}
+  }
+
+  test("Fall back to 

[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183589906
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
+  class DummyFileWriter extends DataSourceWriter {
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  throw new AnalysisException("hehe")
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private val dummyParquetReaderV2 = 
classOf[DummyReadOnlyFileDataSourceV2].getName
+  private val dummyParquetWriterV2 = 
classOf[DummyWriteOnlyFileDataSourceV2].getName
+  private val simpleFileDataSourceV2 = 
classOf[SimpleFileDataSourceV2].getName
+  private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName
+
+  test("Fall back to v1 when writing to file with read only 
FileDataSourceV2") {
+val df = spark.range(1, 10).toDF()
+withTempPath { file =>
+  val path = file.getCanonicalPath
+  // Writing file should fall back to v1 and succeed.
+  df.write.format(dummyParquetReaderV2).save(path)
+
+  // Validate write result with [[ParquetFileFormat]].
+  checkAnswer(spark.read.format(parquetV1).load(path), df)
+
+  // Dummy File reader should fail as expected.
+  val exception = intercept[AnalysisException] {
+spark.read.format(dummyParquetReaderV2).load(path)
+  }
+  assert(exception.message.equals("Dummy file reader"))
+}
+  }
+
+  test("Fall back to 

[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183589673
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
+  class DummyFileWriter extends DataSourceWriter {
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  throw new AnalysisException("hehe")
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private val dummyParquetReaderV2 = 
classOf[DummyReadOnlyFileDataSourceV2].getName
+  private val dummyParquetWriterV2 = 
classOf[DummyWriteOnlyFileDataSourceV2].getName
+  private val simpleFileDataSourceV2 = 
classOf[SimpleFileDataSourceV2].getName
+  private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName
+
+  test("Fall back to v1 when writing to file with read only 
FileDataSourceV2") {
+val df = spark.range(1, 10).toDF()
+withTempPath { file =>
+  val path = file.getCanonicalPath
+  // Writing file should fall back to v1 and succeed.
+  df.write.format(dummyParquetReaderV2).save(path)
+
+  // Validate write result with [[ParquetFileFormat]].
+  checkAnswer(spark.read.format(parquetV1).load(path), df)
+
+  // Dummy File reader should fail as expected.
+  val exception = intercept[AnalysisException] {
+spark.read.format(dummyParquetReaderV2).load(path)
+  }
+  assert(exception.message.equals("Dummy file reader"))
+}
+  }
+
+  test("Fall back to 

[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183589528
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
+  class DummyFileWriter extends DataSourceWriter {
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  throw new AnalysisException("hehe")
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private val dummyParquetReaderV2 = 
classOf[DummyReadOnlyFileDataSourceV2].getName
+  private val dummyParquetWriterV2 = 
classOf[DummyWriteOnlyFileDataSourceV2].getName
+  private val simpleFileDataSourceV2 = 
classOf[SimpleFileDataSourceV2].getName
+  private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName
+
+  test("Fall back to v1 when writing to file with read only 
FileDataSourceV2") {
+val df = spark.range(1, 10).toDF()
+withTempPath { file =>
+  val path = file.getCanonicalPath
+  // Writing file should fall back to v1 and succeed.
+  df.write.format(dummyParquetReaderV2).save(path)
+
+  // Validate write result with [[ParquetFileFormat]].
+  checkAnswer(spark.read.format(parquetV1).load(path), df)
+
+  // Dummy File reader should fail as expected.
--- End diff --

`Dummy File reader should be picked and fail as expected`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, 

[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183589427
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
+  class DummyFileWriter extends DataSourceWriter {
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  throw new AnalysisException("hehe")
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private val dummyParquetReaderV2 = 
classOf[DummyReadOnlyFileDataSourceV2].getName
+  private val dummyParquetWriterV2 = 
classOf[DummyWriteOnlyFileDataSourceV2].getName
+  private val simpleFileDataSourceV2 = 
classOf[SimpleFileDataSourceV2].getName
+  private val parquetV1 = classOf[ParquetFileFormat].getCanonicalName
+
+  test("Fall back to v1 when writing to file with read only 
FileDataSourceV2") {
+val df = spark.range(1, 10).toDF()
--- End diff --

nit: `val df = spark.range(10)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183589226
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2Suite.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2
+
+import java.util.{List => JList, Optional}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetTest}
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 with 
ReadSupport {
+  class DummyFileReader extends DataSourceReader {
+override def readSchema(): StructType = {
+  throw new AnalysisException("hehe")
+}
+
+override def createDataReaderFactories(): 
JList[DataReaderFactory[Row]] =
+  java.util.Arrays.asList()
+  }
+
+  override def createReader(options: DataSourceOptions): DataSourceReader 
= {
+throw new AnalysisException("Dummy file reader")
+  }
+
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 with 
WriteSupport {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+
+  override def createWriter(
+  jobId: String,
+  schema: StructType,
+  mode: SaveMode,
+  options: DataSourceOptions): Optional[DataSourceWriter] = {
+throw new AnalysisException("Dummy file writer")
+  }
+}
+
+class SimpleFileDataSourceV2 extends SimpleDataSourceV2 with 
FileDataSourceV2 {
+  override def fallBackFileFormat: Class[_ <: FileFormat] = 
classOf[ParquetFileFormat]
+
+  override def shortName(): String = "parquet"
+}
+
+class FileDataSourceV2Suite extends QueryTest with ParquetTest with 
SharedSQLContext {
--- End diff --

`FileDataSourceV2FallbackSuite `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183589146
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 ---
@@ -158,6 +158,7 @@ abstract class BaseSessionStateBuilder(
 override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
   new FindDataSourceTable(session) +:
 new ResolveSQLOnFile(session) +:
+new FallBackFileDataSourceToV1(session) +:
--- End diff --

We also need to add this rule to `HiveSessionStateBuilder.analyzer`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183398030
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 ---
@@ -27,15 +27,13 @@ import org.apache.spark.sql.sources.v2.WriteSupport
  */
 trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister {
   /**
-   * Returns an optional V1 [[FileFormat]] class of the same file data 
source.
+   * Returns n V1 [[FileFormat]] class of the same file data source.
--- End diff --

n -> a


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183318946
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.ReadSupport
+import org.apache.spark.sql.sources.v2.WriteSupport
+
+/**
+ * The base class for file data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * Note that this is an empty interface. Data source implementations 
should mix-in at least one of
+ * the plug-in interfaces like {@link ReadSupport} and {@link 
WriteSupport}. Otherwise it's just
+ * a dummy data source which is un-readable/writable.
+ */
+trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister {
+  /**
+   * Returns an optional V1 [[FileFormat]] class of the same file data 
source.
+   * This is a solution for the following cases:
+   * 1. File datasource V2 might be implemented partially during migration.
+   *E.g. if [[ReadSupport]] is implemented while [[WriteSupport]] is 
not,
+   *write path should fall back to V1 implementation.
+   * 2. File datasource V2 implementations cause regression.
+   * 3. Catalog support is required, which is still under development for 
data source V2.
+   */
+  def fallBackFileFormat: Option[Class[_]] = None
--- End diff --

Comments are added to explain this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183318856
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -89,8 +91,13 @@ case class DataSource(
 
   case class SourceInfo(name: String, schema: StructType, 
partitionColumns: Seq[String])
 
-  lazy val providingClass: Class[_] =
-DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
+  lazy val providingClass: Class[_] = {
+val cls = DataSource.lookupDataSource(className, 
sparkSession.sessionState.conf)
+cls.newInstance() match {
+  case f: FileDataSourceV2 => f.fallBackFileFormat.getOrElse(cls)
--- End diff --

Comments are added to explain this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-23 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183318881
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -213,6 +215,25 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
   }
 }
 
+/**
+ * Replaces [[FileDataSourceV2]] with [[DataSource]] if parent node is 
[[InsertIntoTable]].
+ */
+class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
--- End diff --

Comments are added to explain this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183274730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.ReadSupport
+import org.apache.spark.sql.sources.v2.WriteSupport
+
+/**
+ * The base class for file data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * Note that this is an empty interface. Data source implementations 
should mix-in at least one of
+ * the plug-in interfaces like {@link ReadSupport} and {@link 
WriteSupport}. Otherwise it's just
+ * a dummy data source which is un-readable/writable.
+ */
+trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister {
+  /**
+   * Returns an optional V1 [[FileFormat]] class of the same file data 
source.
+   * This is a solution for the following cases:
+   * 1. File datasource V2 might be implemented partially during migration.
+   *E.g. if [[ReadSupport]] is implemented while [[WriteSupport]] is 
not,
+   *write path should fall back to V1 implementation.
+   * 2. File datasource V2 implementations cause regression.
+   * 3. Catalog support is required, which is still under development for 
data source V2.
+   */
+  def fallBackFileFormat: Option[Class[_]] = None
--- End diff --

why it's optional?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183274704
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.ReadSupport
+import org.apache.spark.sql.sources.v2.WriteSupport
+
+/**
+ * The base class for file data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * Note that this is an empty interface. Data source implementations 
should mix-in at least one of
+ * the plug-in interfaces like {@link ReadSupport} and {@link 
WriteSupport}. Otherwise it's just
+ * a dummy data source which is un-readable/writable.
--- End diff --

We won't need to copy the javadoc for the parent class. Just say `A base 
interface for data source v2 implementations of the built-in file-based data 
sources.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183274588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -213,6 +215,25 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
   }
 }
 
+/**
+ * Replaces [[FileDataSourceV2]] with [[DataSource]] if parent node is 
[[InsertIntoTable]].
+ */
+class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
--- End diff --

Need a little more comments about when this can happen


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183274501
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -89,8 +91,13 @@ case class DataSource(
 
   case class SourceInfo(name: String, schema: StructType, 
partitionColumns: Seq[String])
 
-  lazy val providingClass: Class[_] =
-DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
+  lazy val providingClass: Class[_] = {
+val cls = DataSource.lookupDataSource(className, 
sparkSession.sessionState.conf)
+cls.newInstance() match {
+  case f: FileDataSourceV2 => f.fallBackFileFormat.getOrElse(cls)
--- End diff --

why do we need this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL}Create base class for file data...

2018-04-22 Thread gengliangwang
GitHub user gengliangwang opened a pull request:

https://github.com/apache/spark/pull/21123

[SPARK-24045][SQL}Create base class for file data source v2

## What changes were proposed in this pull request?
From #20933, we can see that during File data source V2 migration,  falling 
back to V1 implementation is necessary in following cases:
1. File datasource V2 might be implemented partially during migration.
  E.g. if `ReadSupport` is implemented while `WriteSupport` is not,
  write path should fall back to V1 implementation.
2. File datasource V2 implementations cause regression.
3. Catalog support is required, which is still under development for data 
source V2.

This pr is to create a base class FileDataSourceV2 and resolve the falling 
back problems. 

## How was this patch tested?

Unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gengliangwang/spark FileDataSourceV2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21123.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21123


commit 95628e5a027d029be7dcc4e8e979555bc5e0e4a3
Author: Gengliang Wang 
Date:   2018-04-22T13:41:28Z

add FileDataSourceV2 and fall back mechanism




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org