HIVE-15951 : Make sure base persist directory is unique and deleted (Slim 
Bouguerra via Ashutosh Chauhan)

Change-Id: I7bfda5a1d5e2ca8324ab67b73145c50416ffe808
Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6d9c12f2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6d9c12f2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6d9c12f2

Branch: refs/heads/branch-2.2
Commit: 6d9c12f28e498e3503eee7d40ea82ada581d82c0
Parents: 299926e
Author: Slim Bouguerra <slim.bougue...@gmail.com>
Authored: Thu Feb 16 14:27:00 2017 -0800
Committer: Owen O'Malley <omal...@apache.org>
Committed: Tue Mar 28 14:02:43 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/druid/io/DruidRecordWriter.java | 25 ++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6d9c12f2/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index 1601a9a..5f107da 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -25,6 +25,7 @@ import io.druid.segment.realtime.plumber.Committers;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.LinearShardSpec;
 import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
@@ -40,9 +41,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
 public class DruidRecordWriter implements RecordWriter<NullWritable, 
DruidWritable>,
@@ -73,16 +76,19 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
           final Path segmentsDescriptorsDir,
           final FileSystem fileSystem
   ) {
+    File basePersistDir = new 
File(realtimeTuningConfig.getBasePersistDirectory(),
+            UUID.randomUUID().toString()
+    );
     this.tuningConfig = Preconditions
-            .checkNotNull(realtimeTuningConfig, "realtimeTuningConfig is 
null");
+            
.checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir),
+                    "realtimeTuningConfig is null"
+            );
     this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is 
null");
+
     appenderator = Appenderators
-            .createOffline(this.dataSchema,
-                    tuningConfig,
-                    new FireDepartmentMetrics(), dataSegmentPusher,
-                    DruidStorageHandlerUtils.JSON_MAPPER,
-                    DruidStorageHandlerUtils.INDEX_IO,
-                    DruidStorageHandlerUtils.INDEX_MERGER_V9
+            .createOffline(this.dataSchema, tuningConfig, new 
FireDepartmentMetrics(),
+                    dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER,
+                    DruidStorageHandlerUtils.INDEX_IO, 
DruidStorageHandlerUtils.INDEX_MERGER_V9
             );
     Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need 
to be greater than 0");
     this.maxPartitionSize = maxPartitionSize;
@@ -243,6 +249,11 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
     } catch (InterruptedException e) {
       Throwables.propagate(e);
     } finally {
+      try {
+        FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
+      } catch (Exception e){
+        LOG.error("error cleaning of base persist directory", e);
+      }
       appenderator.close();
     }
   }

Reply via email to