Repository: spark
Updated Branches:
  refs/heads/master 06df34d35 -> e3536406e


[SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsTracker metrics 
collection fails if a new file isn't yet visible

## What changes were proposed in this pull request?

`BasicWriteTaskStatsTracker.getFileSize()` to catch `FileNotFoundException`, 
log  info and then return 0 as a file size.

This ensures that if a newly created file isn't visible due to the store not 
always having create consistency, the metric collection doesn't cause the 
failure.

## How was this patch tested?

New test suite included, `BasicWriteTaskStatsTrackerSuite`. This not only 
checks the resilience to missing files, but verifies the existing logic as to 
how file statistics are gathered.

Note that in the current implementation

1. if you call `Tracker..getFinalStats()` more than once, the file size count 
will increase by size of the last file. This could be fixed by clearing the 
filename field inside `getFinalStats()` itself.

2. If you pass in an empty or null string to `Tracker.newFile(path)` then 
IllegalArgumentException is raised, but only in `getFinalStats()`, rather than 
in `newFile`.  There's a test for this behaviour in the new suite, as it 
verifies that only FNFEs get swallowed.

Author: Steve Loughran <ste...@hortonworks.com>

Closes #18979 from steveloughran/cloud/SPARK-21762-missing-files-in-metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3536406
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3536406
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3536406

Branch: refs/heads/master
Commit: e3536406ec6ff65a8b41ba2f2fd40517a760cfd6
Parents: 06df34d
Author: Steve Loughran <ste...@hortonworks.com>
Authored: Fri Oct 13 23:08:17 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Fri Oct 13 23:08:17 2017 -0700

----------------------------------------------------------------------
 .../datasources/BasicWriteStatsTracker.scala    |  49 ++++-
 .../BasicWriteTaskStatsTrackerSuite.scala       | 220 +++++++++++++++++++
 .../sql/hive/execution/SQLQuerySuite.scala      |   8 +
 3 files changed, 265 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e3536406/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index b8f7d13..11af0aa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.FileNotFoundException
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -44,20 +47,32 @@ case class BasicWriteTaskStats(
  * @param hadoopConf
  */
 class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
-  extends WriteTaskStatsTracker {
+  extends WriteTaskStatsTracker with Logging {
 
   private[this] var numPartitions: Int = 0
   private[this] var numFiles: Int = 0
+  private[this] var submittedFiles: Int = 0
   private[this] var numBytes: Long = 0L
   private[this] var numRows: Long = 0L
 
-  private[this] var curFile: String = null
-
+  private[this] var curFile: Option[String] = None
 
-  private def getFileSize(filePath: String): Long = {
+  /**
+   * Get the size of the file expected to have been written by a worker.
+   * @param filePath path to the file
+   * @return the file size or None if the file was not found.
+   */
+  private def getFileSize(filePath: String): Option[Long] = {
     val path = new Path(filePath)
     val fs = path.getFileSystem(hadoopConf)
-    fs.getFileStatus(path).getLen()
+    try {
+      Some(fs.getFileStatus(path).getLen())
+    } catch {
+      case e: FileNotFoundException =>
+        // may arise against eventually consistent object stores
+        logDebug(s"File $path is not yet visible", e)
+        None
+    }
   }
 
 
@@ -70,12 +85,19 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
   }
 
   override def newFile(filePath: String): Unit = {
-    if (numFiles > 0) {
-      // we assume here that we've finished writing to disk the previous file 
by now
-      numBytes += getFileSize(curFile)
+    statCurrentFile()
+    curFile = Some(filePath)
+    submittedFiles += 1
+  }
+
+  private def statCurrentFile(): Unit = {
+    curFile.foreach { path =>
+      getFileSize(path).foreach { len =>
+        numBytes += len
+        numFiles += 1
+      }
+      curFile = None
     }
-    curFile = filePath
-    numFiles += 1
   }
 
   override def newRow(row: InternalRow): Unit = {
@@ -83,8 +105,11 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
   }
 
   override def getFinalStats(): WriteTaskStats = {
-    if (numFiles > 0) {
-      numBytes += getFileSize(curFile)
+    statCurrentFile()
+    if (submittedFiles != numFiles) {
+      logInfo(s"Expected $submittedFiles files, but only saw $numFiles. " +
+        "This could be due to the output format not writing empty files, " +
+        "or files being not immediately visible in the filesystem.")
     }
     BasicWriteTaskStats(numPartitions, numFiles, numBytes, numRows)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e3536406/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
new file mode 100644
index 0000000..bf3c8ed
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.nio.charset.Charset
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.Utils
+
+/**
+ * Test how BasicWriteTaskStatsTracker handles files.
+ *
+ * Two different datasets are written (alongside 0), one of
+ * length 10, one of 3. They were chosen to be distinct enough
+ * that it is straightforward to determine which file lengths were added
+ * from the sum of all files added. Lengths like "10" and "5" would
+ * be less informative.
+ */
+class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
+
+  private val tempDir = Utils.createTempDir()
+  private val tempDirPath = new Path(tempDir.toURI)
+  private val conf = new Configuration()
+  private val localfs = tempDirPath.getFileSystem(conf)
+  private val data1 = "0123456789".getBytes(Charset.forName("US-ASCII"))
+  private val data2 = "012".getBytes(Charset.forName("US-ASCII"))
+  private val len1 = data1.length
+  private val len2 = data2.length
+
+  /**
+   * In teardown delete the temp dir.
+   */
+  protected override def afterAll(): Unit = {
+    Utils.deleteRecursively(tempDir)
+  }
+
+  /**
+   * Assert that the stats match that expected.
+   * @param tracker tracker to check
+   * @param files number of files expected
+   * @param bytes total number of bytes expected
+   */
+  private def assertStats(
+      tracker: BasicWriteTaskStatsTracker,
+      files: Int,
+      bytes: Int): Unit = {
+    val stats = finalStatus(tracker)
+    assert(files === stats.numFiles, "Wrong number of files")
+    assert(bytes === stats.numBytes, "Wrong byte count of file size")
+  }
+
+  private def finalStatus(tracker: BasicWriteTaskStatsTracker): 
BasicWriteTaskStats = {
+    tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats]
+  }
+
+  test("No files in run") {
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    assertStats(tracker, 0, 0)
+  }
+
+  test("Missing File") {
+    val missing = new Path(tempDirPath, "missing")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile(missing.toString)
+    assertStats(tracker, 0, 0)
+  }
+
+  test("Empty filename is forwarded") {
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile("")
+    intercept[IllegalArgumentException] {
+      finalStatus(tracker)
+    }
+  }
+
+  test("Null filename is only picked up in final status") {
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile(null)
+    intercept[IllegalArgumentException] {
+      finalStatus(tracker)
+    }
+  }
+
+  test("0 byte file") {
+    val file = new Path(tempDirPath, "file0")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile(file.toString)
+    touch(file)
+    assertStats(tracker, 1, 0)
+  }
+
+  test("File with data") {
+    val file = new Path(tempDirPath, "file-with-data")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile(file.toString)
+    write1(file)
+    assertStats(tracker, 1, len1)
+  }
+
+  test("Open file") {
+    val file = new Path(tempDirPath, "file-open")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile(file.toString)
+    val stream = localfs.create(file, true)
+    try {
+      assertStats(tracker, 1, 0)
+      stream.write(data1)
+      stream.flush()
+      assert(1 === finalStatus(tracker).numFiles, "Wrong number of files")
+    } finally {
+      stream.close()
+    }
+  }
+
+  test("Two files") {
+    val file1 = new Path(tempDirPath, "f-2-1")
+    val file2 = new Path(tempDirPath, "f-2-2")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile(file1.toString)
+    write1(file1)
+    tracker.newFile(file2.toString)
+    write2(file2)
+    assertStats(tracker, 2, len1 + len2)
+  }
+
+  test("Three files, last one empty") {
+    val file1 = new Path(tempDirPath, "f-3-1")
+    val file2 = new Path(tempDirPath, "f-3-2")
+    val file3 = new Path(tempDirPath, "f-3-2")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile(file1.toString)
+    write1(file1)
+    tracker.newFile(file2.toString)
+    write2(file2)
+    tracker.newFile(file3.toString)
+    touch(file3)
+    assertStats(tracker, 3, len1 + len2)
+  }
+
+  test("Three files, one not found") {
+    val file1 = new Path(tempDirPath, "f-4-1")
+    val file2 = new Path(tempDirPath, "f-4-2")
+    val file3 = new Path(tempDirPath, "f-3-2")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    // file 1
+    tracker.newFile(file1.toString)
+    write1(file1)
+
+    // file 2 is noted, but not created
+    tracker.newFile(file2.toString)
+
+    // file 3 is noted & then created
+    tracker.newFile(file3.toString)
+    write2(file3)
+
+    // the expected size is file1 + file3; only two files are reported
+    // as found
+    assertStats(tracker, 2, len1 + len2)
+  }
+
+  /**
+   * Write a 0-byte file.
+   * @param file file path
+   */
+  private def touch(file: Path): Unit = {
+    localfs.create(file, true).close()
+  }
+
+  /**
+   * Write a byte array.
+   * @param file path to file
+   * @param data data
+   * @return bytes written
+   */
+  private def write(file: Path, data: Array[Byte]): Integer = {
+    val stream = localfs.create(file, true)
+    try {
+      stream.write(data)
+    } finally {
+      stream.close()
+    }
+    data.length
+  }
+
+  /**
+   * Write a data1 array.
+   * @param file file
+   */
+  private def write1(file: Path): Unit = {
+    write(file, data1)
+  }
+
+  /**
+   * Write a data2 array.
+   *
+   * @param file file
+   */
+  private def write2(file: Path): Unit = {
+    write(file, data2)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e3536406/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 94fa43d..60935c3 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2110,4 +2110,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       }
     }
   }
+
+  Seq("orc", "parquet", "csv", "json", "text").foreach { format =>
+    test(s"Writing empty datasets should not fail - $format") {
+      withTempDir { dir =>
+        
Seq("str").toDS.limit(0).write.format(format).save(dir.getCanonicalPath + 
"/tmp")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to