This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3628242  [MINOR][DSTREAMS] Add DStreamCheckpointData.cleanup warning 
if delete returns false
3628242 is described below

commit 3628242bd05e181f5f1aacd072cd005b6ceec65c
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
AuthorDate: Tue Apr 2 18:34:40 2019 -0500

    [MINOR][DSTREAMS] Add DStreamCheckpointData.cleanup warning if delete 
returns false
    
    ## What changes were proposed in this pull request?
    
    While I was reviewing #24235 I've found a minor addition possibility. 
Namely `FileSystem.delete` returns a boolean which is not yet checked. In this 
PR I've added a warning message when it returns false. I've added this as MINOR 
because no control flow change introduced.
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #24263 from gaborgsomogyi/SPARK-27301-minor.
    
    Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../org/apache/spark/streaming/dstream/DStreamCheckpointData.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index ebfaa83..b35f7d9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -87,9 +87,12 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
               if (fileSystem == null) {
                 fileSystem = 
path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
               }
-              fileSystem.delete(path, true)
+              if (fileSystem.delete(path, true)) {
+                logInfo("Deleted checkpoint file '" + file + "' for time " + 
time)
+              } else {
+                logWarning(s"Error deleting old checkpoint file '$file' for 
time $time")
+              }
               timeToCheckpointFile -= time
-              logInfo("Deleted checkpoint file '" + file + "' for time " + 
time)
             } catch {
               case e: Exception =>
                 logWarning("Error deleting old checkpoint file '" + file + "' 
for time " + time, e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to