spark git commit: [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException
Repository: spark Updated Branches: refs/heads/branch-2.0 03008e049 -> 4dc7d377f [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException ## What changes were proposed in this pull request? This patch appends a message to suggest users running refresh table or reloading data frames when Spark sees a FileNotFoundException due to stale, cached metadata. ## How was this patch tested? Added a unit test for this in MetadataCacheSuite. Author: petermaxlee Closes #14003 from petermaxlee/SPARK-16336. (cherry picked from commit fb41670c9263a89ec233861cc91a19cf1bb19073) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dc7d377 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dc7d377 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dc7d377 Branch: refs/heads/branch-2.0 Commit: 4dc7d377fba39147d8820a5a2866a2fbcb73db98 Parents: 03008e0 Author: petermaxlee Authored: Thu Jun 30 16:49:59 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 16:50:06 2016 -0700 -- .../sql/execution/datasources/FileScanRDD.scala | 15 +++- .../apache/spark/sql/MetadataCacheSuite.scala | 88 2 files changed, 102 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4dc7d377/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index f7f68b1..1314c94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -111,7 +111,20 @@ class FileScanRDD( currentFile = files.next() logInfo(s"Reading File $currentFile") InputFileNameHolder.setInputFileName(currentFile.filePath) - currentIterator = readFunction(currentFile) + + try { +currentIterator = readFunction(currentFile) + } catch { +case e: java.io.FileNotFoundException => + throw new java.io.FileNotFoundException( +e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved." + ) + } + hasNext } else { currentFile = null http://git-wip-us.apache.org/repos/asf/spark/blob/4dc7d377/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala new file mode 100644 index 000..d872f4b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import org.apache.spark.SparkException +import org.apache.spark.sql.test.SharedSQLContext + +/** + * Test suite to handle metadata cache related. + */ +class MetadataCacheSuite extends QueryTest with SharedSQLContext { + + /** Removes one data file in the given directory. */ + private def deleteOneFileInDirectory(dir: File): Unit = { +assert(dir.isDirectory) +val oneFile = dir.listFiles().find { file => + !file.getName.startsWith("_") && !file.getName.startsWith(".") +} +assert(oneFile.isDefined) +oneFile.foreach(_.delete()) + } + + test("SPARK-16336 Suggest doing table refresh when encountering FileNotFoundException") { +
spark git commit: [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException
Repository: spark Updated Branches: refs/heads/master 5d00a7bc1 -> fb41670c9 [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException ## What changes were proposed in this pull request? This patch appends a message to suggest users running refresh table or reloading data frames when Spark sees a FileNotFoundException due to stale, cached metadata. ## How was this patch tested? Added a unit test for this in MetadataCacheSuite. Author: petermaxlee Closes #14003 from petermaxlee/SPARK-16336. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb41670c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb41670c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb41670c Branch: refs/heads/master Commit: fb41670c9263a89ec233861cc91a19cf1bb19073 Parents: 5d00a7b Author: petermaxlee Authored: Thu Jun 30 16:49:59 2016 -0700 Committer: Reynold Xin Committed: Thu Jun 30 16:49:59 2016 -0700 -- .../sql/execution/datasources/FileScanRDD.scala | 15 +++- .../apache/spark/sql/MetadataCacheSuite.scala | 88 2 files changed, 102 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb41670c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 1443057..c66da3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -117,7 +117,20 @@ class FileScanRDD( currentFile = files.next() logInfo(s"Reading File $currentFile") InputFileNameHolder.setInputFileName(currentFile.filePath) - currentIterator = readFunction(currentFile) + + try { +currentIterator = readFunction(currentFile) + } catch { +case e: java.io.FileNotFoundException => + throw new java.io.FileNotFoundException( +e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved." + ) + } + hasNext } else { currentFile = null http://git-wip-us.apache.org/repos/asf/spark/blob/fb41670c/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala new file mode 100644 index 000..d872f4b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import org.apache.spark.SparkException +import org.apache.spark.sql.test.SharedSQLContext + +/** + * Test suite to handle metadata cache related. + */ +class MetadataCacheSuite extends QueryTest with SharedSQLContext { + + /** Removes one data file in the given directory. */ + private def deleteOneFileInDirectory(dir: File): Unit = { +assert(dir.isDirectory) +val oneFile = dir.listFiles().find { file => + !file.getName.startsWith("_") && !file.getName.startsWith(".") +} +assert(oneFile.isDefined) +oneFile.foreach(_.delete()) + } + + test("SPARK-16336 Suggest doing table refresh when encountering FileNotFoundException") { +withTempPath { (location: File) => + // Create a Parquet directory + spark.range(start = 0, end