HIVE-15951 : Make sure base persist directory is unique and deleted (Slim Bouguerra via Ashutosh Chauhan)
Change-Id: I7bfda5a1d5e2ca8324ab67b73145c50416ffe808 Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6d9c12f2 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6d9c12f2 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6d9c12f2 Branch: refs/heads/branch-2.2 Commit: 6d9c12f28e498e3503eee7d40ea82ada581d82c0 Parents: 299926e Author: Slim Bouguerra <slim.bougue...@gmail.com> Authored: Thu Feb 16 14:27:00 2017 -0800 Committer: Owen O'Malley <omal...@apache.org> Committed: Tue Mar 28 14:02:43 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/druid/io/DruidRecordWriter.java | 25 ++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6d9c12f2/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 1601a9a..5f107da 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -25,6 +25,7 @@ import io.druid.segment.realtime.plumber.Committers; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; import org.apache.calcite.adapter.druid.DruidTable; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; @@ -40,9 +41,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>, @@ -73,16 +76,19 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab final Path segmentsDescriptorsDir, final FileSystem fileSystem ) { + File basePersistDir = new File(realtimeTuningConfig.getBasePersistDirectory(), + UUID.randomUUID().toString() + ); this.tuningConfig = Preconditions - .checkNotNull(realtimeTuningConfig, "realtimeTuningConfig is null"); + .checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir), + "realtimeTuningConfig is null" + ); this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null"); + appenderator = Appenderators - .createOffline(this.dataSchema, - tuningConfig, - new FireDepartmentMetrics(), dataSegmentPusher, - DruidStorageHandlerUtils.JSON_MAPPER, - DruidStorageHandlerUtils.INDEX_IO, - DruidStorageHandlerUtils.INDEX_MERGER_V9 + .createOffline(this.dataSchema, tuningConfig, new FireDepartmentMetrics(), + dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, + DruidStorageHandlerUtils.INDEX_IO, DruidStorageHandlerUtils.INDEX_MERGER_V9 ); Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need to be greater than 0"); this.maxPartitionSize = maxPartitionSize; @@ -243,6 +249,11 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab } catch (InterruptedException e) { Throwables.propagate(e); } finally { + try { + FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory()); + } catch (Exception e){ + LOG.error("error cleaning of base persist directory", e); + } appenderator.close(); } }