[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212475271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala --- @@ -79,9 +79,10 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConf() // TODO: We need to support merge schema. Please see SPARK-11412. -files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema => - logDebug(s"Reading schema from file $files, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] +files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { + case Some(schema) => +logDebug(s"Reading schema from file $files, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] --- End diff -- Yeah, it is only ignored during reading schema. The change is the timing when the corrupt files are detected. Now it is postponed to actually reading file contents. That might not be a big deal, though in user experience it is better to throw such exception early. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212373914 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala --- @@ -79,9 +79,10 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConf() // TODO: We need to support merge schema. Please see SPARK-11412. -files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema => - logDebug(s"Reading schema from file $files, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] +files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { + case Some(schema) => +logDebug(s"Reading schema from file $files, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] --- End diff -- @viirya . The corrupt files are not ignored. Spark will throw `SparkException` while reading the content. > Now if Orc source reads the first valid schema, it doesn't read other Orc files further. So the corrupt files are ignored when SQLConf.IGNORE_CORRUPT_FILES is false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22157 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212304433 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala --- @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest { } } +def testAllCorruptFiles(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + +def testAllCorruptFilesWithoutSchemaInfer(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.schema("a long").orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { testIgnoreCorruptFiles() testIgnoreCorruptFilesWithoutSchemaInfer() + val m1 = intercept[AnalysisException] { +testAllCorruptFiles() + }.getMessage + assert(m1.contains("Unable to infer schema for ORC")) + testAllCorruptFilesWithoutSchemaInfer() } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val m1 = intercept[SparkException] { testIgnoreCorruptFiles() }.getMessage - assert(m1.contains("Could not read footer for file")) + assert(m1.contains("Malformed ORC file")) --- End diff -- Ok. It's reasonable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212302555 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala --- @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest { } } +def testAllCorruptFiles(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + +def testAllCorruptFilesWithoutSchemaInfer(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.schema("a long").orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { testIgnoreCorruptFiles() testIgnoreCorruptFilesWithoutSchemaInfer() + val m1 = intercept[AnalysisException] { +testAllCorruptFiles() + }.getMessage + assert(m1.contains("Unable to infer schema for ORC")) + testAllCorruptFilesWithoutSchemaInfer() } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val m1 = intercept[SparkException] { testIgnoreCorruptFiles() }.getMessage - assert(m1.contains("Could not read footer for file")) + assert(m1.contains("Malformed ORC file")) --- End diff -- I agree with this take --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212252932 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala --- @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest { } } +def testAllCorruptFiles(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + +def testAllCorruptFilesWithoutSchemaInfer(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.schema("a long").orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { testIgnoreCorruptFiles() testIgnoreCorruptFilesWithoutSchemaInfer() + val m1 = intercept[AnalysisException] { +testAllCorruptFiles() + }.getMessage + assert(m1.contains("Unable to infer schema for ORC")) + testAllCorruptFilesWithoutSchemaInfer() } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val m1 = intercept[SparkException] { testIgnoreCorruptFiles() }.getMessage - assert(m1.contains("Could not read footer for file")) + assert(m1.contains("Malformed ORC file")) --- End diff -- Let's make sure we don't backport it ... then I think it's fine. I sounds rather a bug to read and validate all schemas (which is inconsistent with Parquet) where we only needs to pick up single file. I don't think we make a guarantee about the pinking order. The possible behaviour change is when only read its schema. Previous code would throw an exception but after this PR it wouldn't. The previous behaviour is something we should expect when mergeSchema option is implemented within ORC side as you guys talked below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212229343 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala --- @@ -79,9 +79,10 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConf() // TODO: We need to support merge schema. Please see SPARK-11412. -files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema => - logDebug(s"Reading schema from file $files, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] +files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { + case Some(schema) => +logDebug(s"Reading schema from file $files, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] --- End diff -- Yeah, I think so. But in Parquet, schema merging is done in parallel. So it won't create all readers at once place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212227737 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala --- @@ -79,9 +79,10 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConf() // TODO: We need to support merge schema. Please see SPARK-11412. -files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema => - logDebug(s"Reading schema from file $files, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] +files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { + case Some(schema) => +logDebug(s"Reading schema from file $files, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] --- End diff -- BTW, I think we have to create a reader for each file when implementing schema merging like parquet, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212225224 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala --- @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest { } } +def testAllCorruptFiles(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + +def testAllCorruptFilesWithoutSchemaInfer(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.schema("a long").orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { testIgnoreCorruptFiles() testIgnoreCorruptFilesWithoutSchemaInfer() + val m1 = intercept[AnalysisException] { +testAllCorruptFiles() + }.getMessage + assert(m1.contains("Unable to infer schema for ORC")) + testAllCorruptFilesWithoutSchemaInfer() } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val m1 = intercept[SparkException] { testIgnoreCorruptFiles() }.getMessage - assert(m1.contains("Could not read footer for file")) + assert(m1.contains("Malformed ORC file")) --- End diff -- I think this is because of the behavior change https://github.com/apache/spark/pull/22157#discussion_r212223394. Previously Orc source reads the third file which is corrupt and throws the exception of `could not read footer for file`. Now Orc source reads the first file for valid schema and skips other two files. When Orc source uses the schema to read the second Orc file, the schema is not consistent, so the exception of `Malformed ORC file` is thrown. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212223394 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala --- @@ -79,9 +79,10 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConf() // TODO: We need to support merge schema. Please see SPARK-11412. -files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema => - logDebug(s"Reading schema from file $files, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] +files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { + case Some(schema) => +logDebug(s"Reading schema from file $files, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] --- End diff -- This might be a behavior change. Previously if there are corrupt files, once `SQLConf.IGNORE_CORRUPT_FILES` is false, Orc source will throw exception when reading those files. Now if Orc source reads the first valid schema, it doesn't read other Orc files further. So the corrupt files are ignored when `SQLConf.IGNORE_CORRUPT_FILES` is false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212178321 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala --- @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest { } } +def testAllCorruptFiles(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + +def testAllCorruptFilesWithoutSchemaInfer(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.schema("a long").orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { testIgnoreCorruptFiles() testIgnoreCorruptFilesWithoutSchemaInfer() + val m1 = intercept[AnalysisException] { +testAllCorruptFiles() + }.getMessage + assert(m1.contains("Unable to infer schema for ORC")) + testAllCorruptFilesWithoutSchemaInfer() } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val m1 = intercept[SparkException] { testIgnoreCorruptFiles() }.getMessage - assert(m1.contains("Could not read footer for file")) + assert(m1.contains("Malformed ORC file")) --- End diff -- why the error message changed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org