[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212475271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 ---
@@ -79,9 +79,10 @@ object OrcUtils extends Logging {
 val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
 val conf = sparkSession.sessionState.newHadoopConf()
 // TODO: We need to support merge schema. Please see SPARK-11412.
-files.map(_.getPath).flatMap(readSchema(_, conf, 
ignoreCorruptFiles)).headOption.map { schema =>
-  logDebug(s"Reading schema from file $files, got Hive schema string: 
$schema")
-  
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
+files.toIterator.map(file => readSchema(file.getPath, conf, 
ignoreCorruptFiles)).collectFirst {
+  case Some(schema) =>
+logDebug(s"Reading schema from file $files, got Hive schema 
string: $schema")
+
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
--- End diff --

Yeah, it is only ignored during reading schema.

The change is the timing when the corrupt files are detected. Now it is 
postponed to actually reading file contents.

That might not be a big deal, though in user experience it is better to 
throw such exception early.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212373914
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 ---
@@ -79,9 +79,10 @@ object OrcUtils extends Logging {
 val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
 val conf = sparkSession.sessionState.newHadoopConf()
 // TODO: We need to support merge schema. Please see SPARK-11412.
-files.map(_.getPath).flatMap(readSchema(_, conf, 
ignoreCorruptFiles)).headOption.map { schema =>
-  logDebug(s"Reading schema from file $files, got Hive schema string: 
$schema")
-  
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
+files.toIterator.map(file => readSchema(file.getPath, conf, 
ignoreCorruptFiles)).collectFirst {
+  case Some(schema) =>
+logDebug(s"Reading schema from file $files, got Hive schema 
string: $schema")
+
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
--- End diff --

@viirya . The corrupt files are not ignored. Spark will throw 
`SparkException` while reading the content.
> Now if Orc source reads the first valid schema, it doesn't read other Orc 
files further. So the corrupt files are ignored when 
SQLConf.IGNORE_CORRUPT_FILES is false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22157


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212304433
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 ---
@@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest {
   }
 }
 
+def testAllCorruptFiles(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
+def testAllCorruptFilesWithoutSchemaInfer(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.schema("a long").orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
   testIgnoreCorruptFiles()
   testIgnoreCorruptFilesWithoutSchemaInfer()
+  val m1 = intercept[AnalysisException] {
+testAllCorruptFiles()
+  }.getMessage
+  assert(m1.contains("Unable to infer schema for ORC"))
+  testAllCorruptFilesWithoutSchemaInfer()
 }
 
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
   val m1 = intercept[SparkException] {
 testIgnoreCorruptFiles()
   }.getMessage
-  assert(m1.contains("Could not read footer for file"))
+  assert(m1.contains("Malformed ORC file"))
--- End diff --

Ok. It's reasonable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212302555
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 ---
@@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest {
   }
 }
 
+def testAllCorruptFiles(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
+def testAllCorruptFilesWithoutSchemaInfer(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.schema("a long").orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
   testIgnoreCorruptFiles()
   testIgnoreCorruptFilesWithoutSchemaInfer()
+  val m1 = intercept[AnalysisException] {
+testAllCorruptFiles()
+  }.getMessage
+  assert(m1.contains("Unable to infer schema for ORC"))
+  testAllCorruptFilesWithoutSchemaInfer()
 }
 
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
   val m1 = intercept[SparkException] {
 testIgnoreCorruptFiles()
   }.getMessage
-  assert(m1.contains("Could not read footer for file"))
+  assert(m1.contains("Malformed ORC file"))
--- End diff --

I agree with this take


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212252932
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 ---
@@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest {
   }
 }
 
+def testAllCorruptFiles(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
+def testAllCorruptFilesWithoutSchemaInfer(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.schema("a long").orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
   testIgnoreCorruptFiles()
   testIgnoreCorruptFilesWithoutSchemaInfer()
+  val m1 = intercept[AnalysisException] {
+testAllCorruptFiles()
+  }.getMessage
+  assert(m1.contains("Unable to infer schema for ORC"))
+  testAllCorruptFilesWithoutSchemaInfer()
 }
 
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
   val m1 = intercept[SparkException] {
 testIgnoreCorruptFiles()
   }.getMessage
-  assert(m1.contains("Could not read footer for file"))
+  assert(m1.contains("Malformed ORC file"))
--- End diff --

Let's make sure we don't backport it ... then I think it's fine. I sounds 
rather a bug to read and validate all schemas (which is inconsistent with 
Parquet) where we only needs to pick up single file. I don't think we make a 
guarantee about the pinking order.

The possible behaviour change is when only read its schema. Previous code 
would throw an exception but after this PR it wouldn't.

The previous behaviour is something we should expect when mergeSchema 
option is implemented within ORC side as you guys talked below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212229343
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 ---
@@ -79,9 +79,10 @@ object OrcUtils extends Logging {
 val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
 val conf = sparkSession.sessionState.newHadoopConf()
 // TODO: We need to support merge schema. Please see SPARK-11412.
-files.map(_.getPath).flatMap(readSchema(_, conf, 
ignoreCorruptFiles)).headOption.map { schema =>
-  logDebug(s"Reading schema from file $files, got Hive schema string: 
$schema")
-  
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
+files.toIterator.map(file => readSchema(file.getPath, conf, 
ignoreCorruptFiles)).collectFirst {
+  case Some(schema) =>
+logDebug(s"Reading schema from file $files, got Hive schema 
string: $schema")
+
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
--- End diff --

Yeah, I think so. But in Parquet, schema merging is done in parallel. So it 
won't create all readers at once place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212227737
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 ---
@@ -79,9 +79,10 @@ object OrcUtils extends Logging {
 val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
 val conf = sparkSession.sessionState.newHadoopConf()
 // TODO: We need to support merge schema. Please see SPARK-11412.
-files.map(_.getPath).flatMap(readSchema(_, conf, 
ignoreCorruptFiles)).headOption.map { schema =>
-  logDebug(s"Reading schema from file $files, got Hive schema string: 
$schema")
-  
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
+files.toIterator.map(file => readSchema(file.getPath, conf, 
ignoreCorruptFiles)).collectFirst {
+  case Some(schema) =>
+logDebug(s"Reading schema from file $files, got Hive schema 
string: $schema")
+
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
--- End diff --

BTW, I think we have to create a reader for each file when implementing 
schema merging like parquet, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212225224
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 ---
@@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest {
   }
 }
 
+def testAllCorruptFiles(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
+def testAllCorruptFilesWithoutSchemaInfer(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.schema("a long").orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
   testIgnoreCorruptFiles()
   testIgnoreCorruptFilesWithoutSchemaInfer()
+  val m1 = intercept[AnalysisException] {
+testAllCorruptFiles()
+  }.getMessage
+  assert(m1.contains("Unable to infer schema for ORC"))
+  testAllCorruptFilesWithoutSchemaInfer()
 }
 
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
   val m1 = intercept[SparkException] {
 testIgnoreCorruptFiles()
   }.getMessage
-  assert(m1.contains("Could not read footer for file"))
+  assert(m1.contains("Malformed ORC file"))
--- End diff --

I think this is because of the behavior change 
https://github.com/apache/spark/pull/22157#discussion_r212223394.

Previously Orc source reads the third file which is corrupt and throws the 
exception of `could not read footer for file`.

Now Orc source reads the first file for valid schema and skips other two 
files. When Orc source uses the schema to read the second Orc file, the schema 
is not consistent, so the exception of `Malformed ORC file` is thrown.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212223394
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 ---
@@ -79,9 +79,10 @@ object OrcUtils extends Logging {
 val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
 val conf = sparkSession.sessionState.newHadoopConf()
 // TODO: We need to support merge schema. Please see SPARK-11412.
-files.map(_.getPath).flatMap(readSchema(_, conf, 
ignoreCorruptFiles)).headOption.map { schema =>
-  logDebug(s"Reading schema from file $files, got Hive schema string: 
$schema")
-  
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
+files.toIterator.map(file => readSchema(file.getPath, conf, 
ignoreCorruptFiles)).collectFirst {
+  case Some(schema) =>
+logDebug(s"Reading schema from file $files, got Hive schema 
string: $schema")
+
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
--- End diff --

This might be a behavior change.

Previously if there are corrupt files, once `SQLConf.IGNORE_CORRUPT_FILES` 
is false, Orc source will throw exception when reading those files.

Now if Orc source reads the first valid schema, it doesn't read other Orc 
files further. So the corrupt files are ignored when 
`SQLConf.IGNORE_CORRUPT_FILES` is false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212178321
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 ---
@@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest {
   }
 }
 
+def testAllCorruptFiles(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
+def testAllCorruptFilesWithoutSchemaInfer(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.schema("a long").orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
   testIgnoreCorruptFiles()
   testIgnoreCorruptFilesWithoutSchemaInfer()
+  val m1 = intercept[AnalysisException] {
+testAllCorruptFiles()
+  }.getMessage
+  assert(m1.contains("Unable to infer schema for ORC"))
+  testAllCorruptFilesWithoutSchemaInfer()
 }
 
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
   val m1 = intercept[SparkException] {
 testIgnoreCorruptFiles()
   }.getMessage
-  assert(m1.contains("Could not read footer for file"))
+  assert(m1.contains("Malformed ORC file"))
--- End diff --

why the error message changed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org