HIVE-17523 : Insert into druid table  hangs Hive server2 in an infinite loop 
(Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/365c0310
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/365c0310
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/365c0310

Branch: refs/heads/hive-14535
Commit: 365c031077ef5d99f623634fe8516e52d5dd7b02
Parents: f6b4f9e
Author: Slim Bouguerra <slim.bougue...@gmail.com>
Authored: Tue Sep 12 18:14:00 2017 -0700
Committer: Ashutosh Chauhan <hashut...@apache.org>
Committed: Thu Sep 28 08:07:01 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/druid/DruidStorageHandler.java  |  59 ++-
 .../hive/druid/DruidStorageHandlerUtils.java    | 471 +++++++++----------
 .../hive/druid/TestDruidStorageHandler.java     | 153 ++++--
 3 files changed, 373 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/365c0310/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index da6d493..62e146d 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hive.druid;
 
+import com.google.common.collect.Lists;
 import io.druid.metadata.MetadataStorageConnectorConfig;
 import io.druid.metadata.MetadataStorageTablesConfig;
 import io.druid.metadata.SQLMetadataConnector;
 import io.druid.metadata.storage.mysql.MySQLConnector;
 import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
+import io.druid.segment.loading.DataSegmentPusher;
 import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.storage.hdfs.HdfsDataSegmentPusher;
+import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
 import io.druid.timeline.DataSegment;
 
 import org.apache.commons.lang3.StringUtils;
@@ -74,6 +78,7 @@ import com.metamx.http.client.HttpClientInit;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -281,17 +286,34 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
     if (MetaStoreUtils.isExternalTable(table)) {
       return;
     }
-    LOG.info("Committing table {} to the druid metastore", table.getDbName());
+    final String dataSourceName = 
table.getParameters().get(Constants.DRUID_DATA_SOURCE);
+    final List<DataSegment> segmentList = Lists.newArrayList();
     final Path tableDir = getSegmentDescriptorDir();
+    console.logInfo(String.format("Committing hive table {} druid data source 
{} to the druid metadata store",
+            table.getTableName(), dataSourceName
+    ));
+    try {
+      
segmentList.addAll(DruidStorageHandlerUtils.getPublishedSegments(tableDir, 
getConf()));
+    } catch (IOException e) {
+      LOG.error("Failed to load segments descriptor from directory {}", 
tableDir.toString());
+      Throwables.propagate(e);
+      cleanWorkingDir();
+    }
     try {
-      List<DataSegment> segmentList = DruidStorageHandlerUtils
-              .getPublishedSegments(tableDir, getConf());
-      LOG.info("Found {} segments under path {}", segmentList.size(), 
tableDir);
-      final String dataSourceName = 
table.getParameters().get(Constants.DRUID_DATA_SOURCE);
       final String segmentDirectory =
               table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != 
null
                       ? 
table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY)
                       : HiveConf.getVar(getConf(), 
HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
+      LOG.info(
+              String.format("Will move [%s] druid segments from [%s] to [%s]",
+                      segmentList.size(),
+                      getStagingWorkingDir(),
+                      segmentDirectory
+
+              ));
+      HdfsDataSegmentPusherConfig pusherConfig = new 
HdfsDataSegmentPusherConfig();
+      pusherConfig.setStorageDirectory(segmentDirectory);
+      DataSegmentPusher dataSegmentPusher = new 
HdfsDataSegmentPusher(pusherConfig, getConf(), 
DruidStorageHandlerUtils.JSON_MAPPER);
       DruidStorageHandlerUtils.publishSegments(
               connector,
               druidMetadataStorageTablesConfig,
@@ -299,13 +321,26 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
               segmentList,
               overwrite,
               segmentDirectory,
-              getConf()
+              getConf(),
+              dataSegmentPusher
 
       );
+    } catch (CallbackFailedException | IOException e ) {
+      LOG.error("Failed to publish segments");
+      if (e instanceof CallbackFailedException)  {
+        Throwables.propagate(e.getCause());
+      }
+      Throwables.propagate(e);
+    } finally {
+      cleanWorkingDir();
+    }
       final String coordinatorAddress = HiveConf
               .getVar(getConf(), 
HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS);
       int maxTries = HiveConf.getIntVar(getConf(), 
HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
-      LOG.info("checking load status from coordinator {}", coordinatorAddress);
+      if (maxTries == 0) {
+        return;
+      }
+      LOG.debug("checking load status from coordinator {}", 
coordinatorAddress);
 
       String coordinatorResponse = null;
       try {
@@ -314,12 +349,12 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
         ), input -> input instanceof IOException, maxTries);
       } catch (Exception e) {
         console.printInfo(
-                "Will skip waiting for data loading");
+                "Will skip waiting for data loading, coordinator unavailable");
         return;
       }
       if (Strings.isNullOrEmpty(coordinatorResponse)) {
         console.printInfo(
-                "Will skip waiting for data loading");
+                "Will skip waiting for data loading empty response from 
coordinator");
         return;
       }
       console.printInfo(
@@ -380,12 +415,6 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
                 setOfUrls.size(), segmentList.size()
         ));
       }
-    } catch (IOException e) {
-      LOG.error("Exception while commit", e);
-      Throwables.propagate(e);
-    } finally {
-      cleanWorkingDir();
-    }
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hive/blob/365c0310/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 7169140..4852ff1 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -41,7 +41,6 @@ import io.druid.timeline.partition.NumberedShardSpec;
 import io.druid.timeline.partition.PartitionChunk;
 import io.druid.timeline.partition.ShardSpec;
 
-import org.apache.calcite.adapter.druid.LocalInterval;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -56,7 +55,6 @@ import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -79,7 +77,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.chrono.ISOChronology;
-import org.joda.time.format.ISODateTimeFormat;
 import org.skife.jdbi.v2.FoldController;
 import org.skife.jdbi.v2.Folder3;
 import org.skife.jdbi.v2.Handle;
@@ -114,7 +111,6 @@ import java.util.TimeZone;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.Nullable;
 
 /**
  * Utils class for Druid storage handler.
@@ -123,13 +119,19 @@ public final class DruidStorageHandlerUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DruidStorageHandlerUtils.class);
 
+  private static final int NUM_RETRIES = 8;
+  private static final int SECONDS_BETWEEN_RETRIES = 2;
+  private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
+  private static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
   private static final String SMILE_CONTENT_TYPE = 
"application/x-jackson-smile";
   public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
-
+  public static final String INDEX_ZIP = "index.zip";
+  public static final String DESCRIPTOR_JSON = "descriptor.json";
   public static final Interval DEFAULT_INTERVAL = new Interval(
           new DateTime("1900-01-01", ISOChronology.getInstanceUTC()),
           new DateTime("3000-01-01", ISOChronology.getInstanceUTC())
   ).withChronology(ISOChronology.getInstanceUTC());
+
   /**
    * Mapper to use to serialize/deserialize Druid objects (JSON)
    */
@@ -140,8 +142,7 @@ public final class DruidStorageHandlerUtils {
    */
   public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new 
SmileFactory());
 
-  static
-  {
+  static {
     // This is needed for serde of PagingSpec as it uses JacksonInject for 
injecting SelectQueryConfig
     InjectableValues.Std injectableValues = new InjectableValues.Std()
             .addValue(SelectQueryConfig.class, new SelectQueryConfig(false))
@@ -151,16 +152,22 @@ public final class DruidStorageHandlerUtils {
     HiveDruidSerializationModule hiveDruidSerializationModule = new 
HiveDruidSerializationModule();
     JSON_MAPPER.registerModule(hiveDruidSerializationModule);
     SMILE_MAPPER.registerModule(hiveDruidSerializationModule);
+    // Register the shard sub type to be used by the mapper
+    JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, 
"linear"));
+    // set the timezone of the object mapper
+    // THIS IS NOT WORKING workaround is to set it as part of java opts 
-Duser.timezone="UTC"
+    JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
+    try {
+      // No operation emitter will be used by some internal druid classes.
+      EmittingLogger.registerEmitter(
+              new ServiceEmitter("druid-hive-indexer", 
InetAddress.getLocalHost().getHostName(),
+                      new NoopEmitter()
+              ));
+    } catch (UnknownHostException e) {
+      throw Throwables.propagate(e);
+    }
   }
 
-  private static final int NUM_RETRIES = 8;
-
-  private static final int SECONDS_BETWEEN_RETRIES = 2;
-
-  private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
-
-  private static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
-
   /**
    * Used by druid to perform IO on indexes
    */
@@ -183,23 +190,6 @@ public final class DruidStorageHandlerUtils {
    */
   public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = 
Interners.newWeakInterner();
 
-  static {
-    // Register the shard sub type to be used by the mapper
-    JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, 
"linear"));
-    // set the timezone of the object mapper
-    // THIS IS NOT WORKING workaround is to set it as part of java opts 
-Duser.timezone="UTC"
-    JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
-    try {
-      // No operation emitter will be used by some internal druid classes.
-      EmittingLogger.registerEmitter(
-              new ServiceEmitter("druid-hive-indexer", 
InetAddress.getLocalHost().getHostName(),
-                      new NoopEmitter()
-              ));
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
   /**
    * Method that creates a request for Druid JSON query (using SMILE).
    *
@@ -287,29 +277,26 @@ public final class DruidStorageHandlerUtils {
   )
           throws IOException {
     final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(
-            DataPusher.class, new DataPusher() {
-              @Override
-              public long push() throws IOException {
-                try {
-                  if (outputFS.exists(descriptorPath)) {
-                    if (!outputFS.delete(descriptorPath, false)) {
-                      throw new IOException(
-                              String.format("Failed to delete descriptor at 
[%s]", descriptorPath));
-                    }
-                  }
-                  try (final OutputStream descriptorOut = outputFS.create(
-                          descriptorPath,
-                          true,
-                          DEFAULT_FS_BUFFER_SIZE
-                  )) {
-                    JSON_MAPPER.writeValue(descriptorOut, segment);
-                    descriptorOut.flush();
+            DataPusher.class, () -> {
+              try {
+                if (outputFS.exists(descriptorPath)) {
+                  if (!outputFS.delete(descriptorPath, false)) {
+                    throw new IOException(
+                            String.format("Failed to delete descriptor at 
[%s]", descriptorPath));
                   }
-                } catch (RuntimeException | IOException ex) {
-                  throw ex;
                 }
-                return -1;
+                try (final OutputStream descriptorOut = outputFS.create(
+                        descriptorPath,
+                        true,
+                        DEFAULT_FS_BUFFER_SIZE
+                )) {
+                  JSON_MAPPER.writeValue(descriptorOut, segment);
+                  descriptorOut.flush();
+                }
+              } catch (RuntimeException | IOException ex) {
+                throw ex;
               }
+              return -1;
             },
             RetryPolicies
                     .exponentialBackoffRetry(NUM_RETRIES, 
SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
@@ -327,31 +314,25 @@ public final class DruidStorageHandlerUtils {
           final MetadataStorageTablesConfig metadataStorageTablesConfig
   ) {
     return connector.getDBI().withHandle(
-            new HandleCallback<List<String>>() {
-              @Override
-              public List<String> withHandle(Handle handle) throws Exception {
-                return handle.createQuery(
-                        String.format("SELECT DISTINCT(datasource) FROM %s 
WHERE used = true",
-                                metadataStorageTablesConfig.getSegmentsTable()
-                        ))
-                        .fold(Lists.<String>newArrayList(),
-                                new Folder3<ArrayList<String>, Map<String, 
Object>>() {
-                                  @Override
-                                  public ArrayList<String> 
fold(ArrayList<String> druidDataSources,
-                                          Map<String, Object> stringObjectMap,
-                                          FoldController foldController,
-                                          StatementContext statementContext
-                                  ) throws SQLException {
-                                    druidDataSources.add(
-                                            
MapUtils.getString(stringObjectMap, "datasource")
-                                    );
-                                    return druidDataSources;
-                                  }
-                                }
-                        );
-
-              }
-            }
+            (HandleCallback<List<String>>) handle -> handle.createQuery(
+                    String.format("SELECT DISTINCT(datasource) FROM %s WHERE 
used = true",
+                            metadataStorageTablesConfig.getSegmentsTable()
+                    ))
+                    .fold(Lists.<String>newArrayList(),
+                            new Folder3<ArrayList<String>, Map<String, 
Object>>() {
+                              @Override
+                              public ArrayList<String> fold(ArrayList<String> 
druidDataSources,
+                                      Map<String, Object> stringObjectMap,
+                                      FoldController foldController,
+                                      StatementContext statementContext
+                              ) throws SQLException {
+                                druidDataSources.add(
+                                        MapUtils.getString(stringObjectMap, 
"datasource")
+                                );
+                                return druidDataSources;
+                              }
+                            }
+                    )
     );
   }
 
@@ -372,12 +353,9 @@ public final class DruidStorageHandlerUtils {
       }
 
       connector.getDBI().withHandle(
-              new HandleCallback<Void>() {
-                @Override
-                public Void withHandle(Handle handle) throws Exception {
-                  disableDataSourceWithHandle(handle, 
metadataStorageTablesConfig, dataSource);
-                  return null;
-                }
+              (HandleCallback<Void>) handle -> {
+                disableDataSourceWithHandle(handle, 
metadataStorageTablesConfig, dataSource);
+                return null;
               }
       );
 
@@ -394,134 +372,131 @@ public final class DruidStorageHandlerUtils {
           final List<DataSegment> segments,
           boolean overwrite,
           String segmentDirectory,
-          Configuration conf) {
-    try {
-      connector.getDBI().inTransaction(
-              new TransactionCallback<Void>() {
-                @Override
-                public Void inTransaction(Handle handle, TransactionStatus 
transactionStatus)
-                        throws Exception {
-                  final List<DataSegment> finalSegmentsToPublish = 
Lists.newArrayList();
-                  VersionedIntervalTimeline<String, DataSegment> timeline;
-                  if (overwrite) {
-                    disableDataSourceWithHandle(handle, 
metadataStorageTablesConfig, dataSource);
-                    // When overwriting start with empty timeline, as we are 
overwriting segments with new versions
-                    timeline = new VersionedIntervalTimeline<>(
-                            Ordering.natural()
-                    );
-                  } else {
-                    // Append Mode - build a timeline of existing segments in 
metadata storage.
-                    Interval indexedInterval = JodaUtils
-                            .umbrellaInterval(Iterables.transform(segments,
-                                    new Function<DataSegment, Interval>() {
-                                      @Override
-                                      public Interval apply(@Nullable 
DataSegment input) {
-                                        return input.getInterval();
-                                      }
-                                    }));
-                    timeline = getTimelineForIntervalWithHandle(
-                            handle, dataSource, indexedInterval, 
metadataStorageTablesConfig);
-                  }
-                  for (DataSegment segment : segments) {
-                    List<TimelineObjectHolder<String, DataSegment>> 
existingChunks = timeline
-                            .lookup(segment.getInterval());
-                    if (existingChunks.size() > 1) {
-                      // Not possible to expand since we have more than one 
chunk with a single segment.
-                      // This is the case when user wants to append a segment 
with coarser granularity.
-                      // e.g If metadata storage already has segments for with 
granularity HOUR and segments to append have DAY granularity.
-                      // Druid shard specs does not support multiple 
partitions for same interval with different granularity.
-                      throw new IllegalStateException(
-                              String.format(
-                                      "Cannot allocate new segment for 
dataSource[%s], interval[%s], already have [%,d] chunks. Not possible to append 
new segment.",
-                                      dataSource,
-                                      segment.getInterval(),
-                                      existingChunks.size()
-                              )
-                      );
-                    }
-                    // Find out the segment with latest version and maximum 
partition number
-                    SegmentIdentifier max = null;
-                    final ShardSpec newShardSpec;
-                    final String newVersion;
-                    if (!existingChunks.isEmpty()) {
-                      // Some existing chunk, Find max
-                      TimelineObjectHolder<String, DataSegment> existingHolder 
= Iterables
-                              .getOnlyElement(existingChunks);
-                      for (PartitionChunk<DataSegment> existing : 
existingHolder.getObject()) {
-                        if (max == null ||
-                                max.getShardSpec().getPartitionNum() < 
existing.getObject()
-                                        .getShardSpec()
-                                        .getPartitionNum()) {
-                          max = 
SegmentIdentifier.fromDataSegment(existing.getObject());
-                        }
-                      }
-                    }
-
-                    if (max == null) {
-                      // No existing shard present in the database, use the 
current version.
-                      newShardSpec = segment.getShardSpec();
-                      newVersion = segment.getVersion();
-                    } else  {
-                      // use version of existing max segment to generate new 
shard spec
-                      newShardSpec = 
getNextPartitionShardSpec(max.getShardSpec());
-                      newVersion = max.getVersion();
-                    }
-
-                    DataSegment publishedSegment = 
publishSegmentWithShardSpec(segment,
-                            newShardSpec, newVersion,
-                            segmentDirectory, 
getPath(segment).getFileSystem(conf));
-                    finalSegmentsToPublish.add(publishedSegment);
-                    timeline.add(publishedSegment.getInterval(), 
publishedSegment.getVersion(),
-                            
publishedSegment.getShardSpec().createChunk(publishedSegment));
-
-                  }
-
-                  // Publish new segments to metadata storage
-                  final PreparedBatch batch = handle.prepareBatch(
+          Configuration conf,
+          DataSegmentPusher dataSegmentPusher
+  ) throws CallbackFailedException {
+    connector.getDBI().inTransaction(
+            (TransactionCallback<Void>) (handle, transactionStatus) -> {
+              final List<DataSegment> finalSegmentsToPublish = 
Lists.newArrayList();
+              VersionedIntervalTimeline<String, DataSegment> timeline;
+              if (overwrite) {
+                disableDataSourceWithHandle(handle, 
metadataStorageTablesConfig, dataSource);
+                // When overwriting start with empty timeline, as we are 
overwriting segments with new versions
+                timeline = new VersionedIntervalTimeline<>(
+                        Ordering.natural()
+                );
+              } else {
+                // Append Mode - build a timeline of existing segments in 
metadata storage.
+                Interval indexedInterval = JodaUtils
+                        .umbrellaInterval(Iterables.transform(segments,
+                                input -> input.getInterval()
+                        ));
+                LOG.info("Building timeline for umbrella Interval [{}]", 
indexedInterval);
+                timeline = getTimelineForIntervalWithHandle(
+                        handle, dataSource, indexedInterval, 
metadataStorageTablesConfig);
+              }
+              for (DataSegment segment : segments) {
+                List<TimelineObjectHolder<String, DataSegment>> existingChunks 
= timeline
+                        .lookup(segment.getInterval());
+                if (existingChunks.size() > 1) {
+                  // Not possible to expand since we have more than one chunk 
with a single segment.
+                  // This is the case when user wants to append a segment with 
coarser granularity.
+                  // e.g If metadata storage already has segments for with 
granularity HOUR and segments to append have DAY granularity.
+                  // Druid shard specs does not support multiple partitions 
for same interval with different granularity.
+                  throw new IllegalStateException(
                           String.format(
-                                  "INSERT INTO %1$s (id, dataSource, 
created_date, start, \"end\", partitioned, version, used, payload) "
-                                      + "VALUES (:id, :dataSource, 
:created_date, :start, :end, :partitioned, :version, :used, :payload)",
-                                  
metadataStorageTablesConfig.getSegmentsTable()
+                                  "Cannot allocate new segment for 
dataSource[%s], interval[%s], already have [%,d] chunks. Not possible to append 
new segment.",
+                                  dataSource,
+                                  segment.getInterval(),
+                                  existingChunks.size()
                           )
-
                   );
-
-                  for (final DataSegment segment : finalSegmentsToPublish) {
-
-                    batch.add(
-                            new ImmutableMap.Builder<String, Object>()
-                                    .put("id", segment.getIdentifier())
-                                    .put("dataSource", segment.getDataSource())
-                                    .put("created_date", new 
DateTime().toString())
-                                    .put("start", 
segment.getInterval().getStart().toString())
-                                    .put("end", 
segment.getInterval().getEnd().toString())
-                                    .put("partitioned",
-                                            (segment.getShardSpec() instanceof 
NoneShardSpec) ?
-                                                    false :
-                                                    true)
-                                    .put("version", segment.getVersion())
-                                    .put("used", true)
-                                    .put("payload", 
JSON_MAPPER.writeValueAsBytes(segment))
-                                    .build()
-                    );
-
-                    LOG.info("Published {}", segment.getIdentifier());
-
+                }
+                // Find out the segment with latest version and maximum 
partition number
+                SegmentIdentifier max = null;
+                final ShardSpec newShardSpec;
+                final String newVersion;
+                if (!existingChunks.isEmpty()) {
+                  // Some existing chunk, Find max
+                  TimelineObjectHolder<String, DataSegment> existingHolder = 
Iterables
+                          .getOnlyElement(existingChunks);
+                  for (PartitionChunk<DataSegment> existing : 
existingHolder.getObject()) {
+                    if (max == null ||
+                            max.getShardSpec().getPartitionNum() < 
existing.getObject()
+                                    .getShardSpec()
+                                    .getPartitionNum()) {
+                      max = 
SegmentIdentifier.fromDataSegment(existing.getObject());
+                    }
                   }
-                  batch.execute();
+                }
 
-                  return null;
+                if (max == null) {
+                  // No existing shard present in the database, use the 
current version.
+                  newShardSpec = segment.getShardSpec();
+                  newVersion = segment.getVersion();
+                } else {
+                  // use version of existing max segment to generate new shard 
spec
+                  newShardSpec = getNextPartitionShardSpec(max.getShardSpec());
+                  newVersion = max.getVersion();
                 }
+                DataSegment publishedSegment = publishSegmentWithShardSpec(
+                        segment,
+                        newShardSpec,
+                        newVersion,
+                        getPath(segment).getFileSystem(conf),
+                        dataSegmentPusher
+                );
+                finalSegmentsToPublish.add(publishedSegment);
+                timeline.add(
+                        publishedSegment.getInterval(),
+                        publishedSegment.getVersion(),
+                        
publishedSegment.getShardSpec().createChunk(publishedSegment)
+                );
+
               }
-      );
-    } catch (CallbackFailedException e) {
-      LOG.error("Exception while publishing segments", e.getCause());
-      throw Throwables.propagate(e.getCause());
-    }
+
+              // Publish new segments to metadata storage
+              final PreparedBatch batch = handle.prepareBatch(
+                      String.format(
+                              "INSERT INTO %1$s (id, dataSource, created_date, 
start, \"end\", partitioned, version, used, payload) "
+                                      + "VALUES (:id, :dataSource, 
:created_date, :start, :end, :partitioned, :version, :used, :payload)",
+                              metadataStorageTablesConfig.getSegmentsTable()
+                      )
+
+              );
+
+              for (final DataSegment segment : finalSegmentsToPublish) {
+
+                batch.add(
+                        new ImmutableMap.Builder<String, Object>()
+                                .put("id", segment.getIdentifier())
+                                .put("dataSource", segment.getDataSource())
+                                .put("created_date", new DateTime().toString())
+                                .put("start", 
segment.getInterval().getStart().toString())
+                                .put("end", 
segment.getInterval().getEnd().toString())
+                                .put("partitioned",
+                                        (segment.getShardSpec() instanceof 
NoneShardSpec) ?
+                                                false :
+                                                true
+                                )
+                                .put("version", segment.getVersion())
+                                .put("used", true)
+                                .put("payload", 
JSON_MAPPER.writeValueAsBytes(segment))
+                                .build()
+                );
+
+                LOG.info("Published {}", segment.getIdentifier());
+              }
+              batch.execute();
+
+              return null;
+            }
+    );
   }
 
   public static void disableDataSourceWithHandle(Handle handle,
-          MetadataStorageTablesConfig metadataStorageTablesConfig, String 
dataSource) {
+          MetadataStorageTablesConfig metadataStorageTablesConfig, String 
dataSource
+  ) {
     handle.createStatement(
             String.format("UPDATE %s SET used=false WHERE dataSource = 
:dataSource",
                     metadataStorageTablesConfig.getSegmentsTable()
@@ -542,44 +517,31 @@ public final class DruidStorageHandlerUtils {
           final MetadataStorageTablesConfig metadataStorageTablesConfig, final 
String dataSource
   ) {
     List<DataSegment> segmentList = connector.retryTransaction(
-            new TransactionCallback<List<DataSegment>>() {
-              @Override
-              public List<DataSegment> inTransaction(
-                      Handle handle, TransactionStatus status
-              ) throws Exception {
-                return handle
-                        .createQuery(String.format(
-                                "SELECT payload FROM %s WHERE dataSource = 
:dataSource",
-                                metadataStorageTablesConfig.getSegmentsTable()
-                        ))
-                        .setFetchSize(getStreamingFetchSize(connector))
-                        .bind("dataSource", dataSource)
-                        .map(ByteArrayMapper.FIRST)
-                        .fold(
-                                new ArrayList<DataSegment>(),
-                                new Folder3<List<DataSegment>, byte[]>() {
-                                  @Override
-                                  public List<DataSegment> 
fold(List<DataSegment> accumulator,
-                                          byte[] payload, FoldController 
control,
-                                          StatementContext ctx
-                                  ) throws SQLException {
-                                    try {
-                                      final DataSegment segment = 
DATA_SEGMENT_INTERNER.intern(
-                                              JSON_MAPPER.readValue(
-                                                      payload,
-                                                      DataSegment.class
-                                              ));
-
-                                      accumulator.add(segment);
-                                      return accumulator;
-                                    } catch (Exception e) {
-                                      throw new SQLException(e.toString());
-                                    }
-                                  }
-                                }
-                        );
-              }
-            }
+            (handle, status) -> handle
+                    .createQuery(String.format(
+                            "SELECT payload FROM %s WHERE dataSource = 
:dataSource",
+                            metadataStorageTablesConfig.getSegmentsTable()
+                    ))
+                    .setFetchSize(getStreamingFetchSize(connector))
+                    .bind("dataSource", dataSource)
+                    .map(ByteArrayMapper.FIRST)
+                    .fold(
+                            new ArrayList<>(),
+                            (Folder3<List<DataSegment>, byte[]>) (accumulator, 
payload, control, ctx) -> {
+                              try {
+                                final DataSegment segment = 
DATA_SEGMENT_INTERNER.intern(
+                                        JSON_MAPPER.readValue(
+                                                payload,
+                                                DataSegment.class
+                                        ));
+
+                                accumulator.add(segment);
+                                return accumulator;
+                              } catch (Exception e) {
+                                throw new SQLException(e.toString());
+                              }
+                            }
+                    )
             , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES);
     return segmentList;
   }
@@ -672,7 +634,8 @@ public final class DruidStorageHandlerUtils {
                 DataSegment.class
         );
         timeline.add(segment.getInterval(), segment.getVersion(),
-                segment.getShardSpec().createChunk(segment));
+                segment.getShardSpec().createChunk(segment)
+        );
       }
     } finally {
       dbSegments.close();
@@ -689,8 +652,8 @@ public final class DruidStorageHandlerUtils {
   }
 
   public static DataSegment publishSegmentWithShardSpec(DataSegment segment, 
ShardSpec shardSpec,
-          String version, String segmentDirectory, FileSystem fs)
-          throws IOException {
+          String version, FileSystem fs, DataSegmentPusher dataSegmentPusher
+  ) throws IOException {
     boolean retry = true;
     DataSegment.Builder dataSegmentBuilder = new 
DataSegment.Builder(segment).version(version);
     Path finalPath = null;
@@ -698,8 +661,9 @@ public final class DruidStorageHandlerUtils {
       retry = false;
       dataSegmentBuilder.shardSpec(shardSpec);
       final Path intermediatePath = getPath(segment);
-      finalPath = finalPathForSegment(segmentDirectory, 
dataSegmentBuilder.build());
 
+      finalPath = new Path(dataSegmentPusher.getPathForHadoop(), 
dataSegmentPusher
+              .makeIndexPathName(dataSegmentBuilder.build(), 
DruidStorageHandlerUtils.INDEX_ZIP));
       // Create parent if it does not exist, recreation is not an error
       fs.mkdirs(finalPath.getParent());
 
@@ -718,34 +682,21 @@ public final class DruidStorageHandlerUtils {
       }
     }
     DataSegment dataSegment = dataSegmentBuilder
-            .loadSpec(ImmutableMap.<String, Object>of("type", "hdfs", "path", 
finalPath.toString()))
+            .loadSpec(dataSegmentPusher.makeLoadSpec(finalPath.toUri()))
             .build();
 
-    writeSegmentDescriptor(fs, dataSegment, new Path(finalPath.getParent(), 
"descriptor.json"));
+    writeSegmentDescriptor(fs, dataSegment, new Path(finalPath.getParent(), 
DruidStorageHandlerUtils.DESCRIPTOR_JSON));
 
     return dataSegment;
   }
 
-  public static Path finalPathForSegment(String segmentDirectory, DataSegment 
segment) {
-    String path = DataSegmentPusher.JOINER.join(
-            segment.getDataSource(),
-            String.format(
-                    "%s_%s",
-                    
segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
-                    
segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
-            ),
-            segment.getVersion().replaceAll(":", "_")
-    );
-
-    return new Path(String.format("%s/%s/index.zip", segmentDirectory, path));
-  }
-
   private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) {
     if (shardSpec instanceof LinearShardSpec) {
       return new LinearShardSpec(shardSpec.getPartitionNum() + 1);
     } else if (shardSpec instanceof NumberedShardSpec) {
       return new NumberedShardSpec(shardSpec.getPartitionNum(),
-              ((NumberedShardSpec) shardSpec).getPartitions());
+              ((NumberedShardSpec) shardSpec).getPartitions()
+      );
     } else {
       // Druid only support appending more partitions to Linear and Numbered 
ShardSpecs.
       throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/hive/blob/365c0310/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index 25f96b3..56b437d 100644
--- 
a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -115,6 +115,7 @@ public class TestDruidStorageHandler {
     config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), 
tableWorkingPath);
     config.set(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY),
             new Path(tableWorkingPath, "finalSegmentDir").toString());
+    config.set("hive.druid.maxTries", "0");
     druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
             derbyConnectorRule.metadataTablesConfigSupplier().get()
@@ -245,26 +246,35 @@ public class TestDruidStorageHandler {
     DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector();
     MetadataStorageTablesConfig metadataStorageTablesConfig = 
derbyConnectorRule
             .metadataTablesConfigSupplier().get();
-
     druidStorageHandler.preCreateTable(tableMock);
     LocalFileSystem localFileSystem = FileSystem.getLocal(config);
     Path taskDirPath = new Path(tableWorkingPath, 
druidStorageHandler.makeStagingName());
-    DataSegment dataSegment = createSegment(new Path(taskDirPath, 
"index.zip").toString(),
-            new Interval(180, 250), "v1", new LinearShardSpec(0));
-    Path descriptorPath = 
DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
-            new Path(taskDirPath, 
DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
-    );
+    HdfsDataSegmentPusherConfig pusherConfig = new 
HdfsDataSegmentPusherConfig();
+    
pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)));
+    DataSegmentPusher dataSegmentPusher = new 
HdfsDataSegmentPusher(pusherConfig, config, 
DruidStorageHandlerUtils.JSON_MAPPER);
+
+    // This create and publish the segment to be overwritten
     List<DataSegment> existingSegments = Arrays
-            .asList(createSegment(new Path(taskDirPath, 
"index_old.zip").toString(),
+            .asList(createSegment(new Path(taskDirPath, 
DruidStorageHandlerUtils.INDEX_ZIP).toString(),
                     new Interval(100, 150), "v0", new LinearShardSpec(0)));
     DruidStorageHandlerUtils
             .publishSegments(connector, metadataStorageTablesConfig, 
DATA_SOURCE_NAME,
                     existingSegments,
                     true,
                     taskDirPath.toString(),
-                    config
+                    config,
+                    dataSegmentPusher
             );
+
+    // This creates and publish new segment
+    DataSegment dataSegment = createSegment(new Path(taskDirPath, 
DruidStorageHandlerUtils.INDEX_ZIP).toString(),
+            new Interval(180, 250), "v1", new LinearShardSpec(0));
+
+    Path descriptorPath = 
DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+            new Path(taskDirPath, 
DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+    );
     DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, 
dataSegment, descriptorPath);
+
     druidStorageHandler.commitInsertTable(tableMock, true);
     Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), 
Lists.newArrayList(
             DruidStorageHandlerUtils.getAllDataSourceNames(connector,
@@ -277,13 +287,12 @@ public class TestDruidStorageHandler {
     DataSegment persistedSegment = Iterables.getOnlyElement(dataSegmentList);
     Assert.assertEquals(dataSegment, persistedSegment);
     Assert.assertEquals(dataSegment.getVersion(), 
persistedSegment.getVersion());
-    String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment(
-            
config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), 
persistedSegment)
-            .toString();
-    Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", 
expectedFinalPath),
+    Path expectedFinalHadoopPath =  new 
Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher
+            .makeIndexPathName(persistedSegment, 
DruidStorageHandlerUtils.INDEX_ZIP));
+    Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", 
expectedFinalHadoopPath.toString()),
             persistedSegment.getLoadSpec());
     Assert.assertEquals("dummySegmentData",
-            FileUtils.readFileToString(new File(expectedFinalPath)));
+            FileUtils.readFileToString(new 
File(expectedFinalHadoopPath.toUri())));
   }
 
   private List<DataSegment> getUsedSegmentsList(DerbyConnectorTestUtility 
connector,
@@ -325,16 +334,21 @@ public class TestDruidStorageHandler {
     LocalFileSystem localFileSystem = FileSystem.getLocal(config);
     Path taskDirPath = new Path(tableWorkingPath, 
druidStorageHandler.makeStagingName());
     List<DataSegment> existingSegments = Arrays
-            .asList(createSegment(new Path(taskDirPath, 
"index_old.zip").toString(),
+            .asList(createSegment(new Path(taskDirPath, 
DruidStorageHandlerUtils.INDEX_ZIP).toString(),
                     new Interval(100, 150), "v0", new LinearShardSpec(1)));
+    HdfsDataSegmentPusherConfig pusherConfig = new 
HdfsDataSegmentPusherConfig();
+    
pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)));
+    DataSegmentPusher dataSegmentPusher = new 
HdfsDataSegmentPusher(pusherConfig, config, 
DruidStorageHandlerUtils.JSON_MAPPER);
+
     DruidStorageHandlerUtils
             .publishSegments(connector, metadataStorageTablesConfig, 
DATA_SOURCE_NAME,
                     existingSegments,
                     true,
                     taskDirPath.toString(),
-                    config
+                    config,
+                    dataSegmentPusher
             );
-    DataSegment dataSegment = createSegment(new Path(taskDirPath, 
"index.zip").toString(),
+    DataSegment dataSegment = createSegment(new Path(taskDirPath, 
DruidStorageHandlerUtils.INDEX_ZIP).toString(),
             new Interval(100, 150), "v1", new LinearShardSpec(0));
     Path descriptorPath = 
DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
             new Path(taskDirPath, 
DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
@@ -355,13 +369,68 @@ public class TestDruidStorageHandler {
     Assert.assertEquals("v0", persistedSegment.getVersion());
     Assert.assertTrue(persistedSegment.getShardSpec() instanceof 
LinearShardSpec);
     Assert.assertEquals(2, persistedSegment.getShardSpec().getPartitionNum());
-    String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment(
-            
config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), 
persistedSegment)
-            .toString();
-    Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", 
expectedFinalPath),
+
+    Path expectedFinalHadoopPath =  new 
Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher
+            .makeIndexPathName(persistedSegment, 
DruidStorageHandlerUtils.INDEX_ZIP));
+
+    Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", 
expectedFinalHadoopPath.toString()),
             persistedSegment.getLoadSpec());
     Assert.assertEquals("dummySegmentData",
-            FileUtils.readFileToString(new File(expectedFinalPath)));
+            FileUtils.readFileToString(new 
File(expectedFinalHadoopPath.toUri())));
+  }
+
+  @Test
+  public void testInsertIntoAppendOneMorePartition() throws MetaException, 
IOException {
+    DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector();
+    MetadataStorageTablesConfig metadataStorageTablesConfig = 
derbyConnectorRule
+            .metadataTablesConfigSupplier().get();
+    druidStorageHandler.preCreateTable(tableMock);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    Path taskDirPath = new Path(tableWorkingPath, 
druidStorageHandler.makeStagingName());
+    HdfsDataSegmentPusherConfig pusherConfig = new 
HdfsDataSegmentPusherConfig();
+    
pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)));
+    DataSegmentPusher dataSegmentPusher = new 
HdfsDataSegmentPusher(pusherConfig, config, 
DruidStorageHandlerUtils.JSON_MAPPER);
+
+    List<DataSegment> existingSegments = Arrays
+            .asList(createSegment(new Path(taskDirPath, 
DruidStorageHandlerUtils.INDEX_ZIP).toString(),
+                    new Interval(100, 150), "v0", new LinearShardSpec(0)));
+    DruidStorageHandlerUtils
+            .publishSegments(connector, metadataStorageTablesConfig, 
DATA_SOURCE_NAME,
+                    existingSegments,
+                    true,
+                    taskDirPath.toString(),
+                    config,
+                    dataSegmentPusher
+            );
+
+    DataSegment dataSegment = createSegment(new Path(taskDirPath, 
DruidStorageHandlerUtils.INDEX_ZIP).toString(),
+            new Interval(100, 150), "v0", new LinearShardSpec(0));
+    Path descriptorPath = 
DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+            new Path(taskDirPath, 
DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+    );
+    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, 
dataSegment, descriptorPath);
+    druidStorageHandler.commitInsertTable(tableMock, false);
+    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), 
Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(connector,
+                    metadataStorageTablesConfig
+            )).toArray());
+
+    final List<DataSegment> dataSegmentList = getUsedSegmentsList(connector,
+            metadataStorageTablesConfig);
+    Assert.assertEquals(2, dataSegmentList.size());
+
+    DataSegment persistedSegment = dataSegmentList.get(1);
+    Assert.assertEquals("v0", persistedSegment.getVersion());
+    Assert.assertTrue(persistedSegment.getShardSpec() instanceof 
LinearShardSpec);
+    Assert.assertEquals(1, persistedSegment.getShardSpec().getPartitionNum());
+
+    Path expectedFinalHadoopPath =  new 
Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher
+            .makeIndexPathName(persistedSegment, 
DruidStorageHandlerUtils.INDEX_ZIP));
+
+    Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", 
expectedFinalHadoopPath.toString()),
+            persistedSegment.getLoadSpec());
+    Assert.assertEquals("dummySegmentData",
+            FileUtils.readFileToString(new 
File(expectedFinalHadoopPath.toUri())));
   }
 
   @Test
@@ -376,12 +445,16 @@ public class TestDruidStorageHandler {
     List<DataSegment> existingSegments = Arrays
             .asList(createSegment(new Path(taskDirPath, 
"index_old.zip").toString(),
                     new Interval(100, 150), "v0", new LinearShardSpec(1)));
+    HdfsDataSegmentPusherConfig pusherConfig = new 
HdfsDataSegmentPusherConfig();
+    
pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)));
+    DataSegmentPusher dataSegmentPusher = new 
HdfsDataSegmentPusher(pusherConfig, config, 
DruidStorageHandlerUtils.JSON_MAPPER);
     DruidStorageHandlerUtils
             .publishSegments(connector, metadataStorageTablesConfig, 
DATA_SOURCE_NAME,
                     existingSegments,
                     true,
                     taskDirPath.toString(),
-                    config
+                    config,
+                    dataSegmentPusher
             );
     DataSegment dataSegment = createSegment(new Path(taskDirPath, 
"index.zip").toString(),
             new Interval(100, 150), "v1", new LinearShardSpec(0));
@@ -391,10 +464,11 @@ public class TestDruidStorageHandler {
     DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, 
dataSegment, descriptorPath);
 
     // Create segment file at the destination location with LinearShardSpec(2)
-    FileUtils.writeStringToFile(new 
File(DruidStorageHandlerUtils.finalPathForSegment(
-            
config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)),
-            createSegment(new Path(taskDirPath, 
"index_conflict.zip").toString(),
-                    new Interval(100, 150), "v1", new 
LinearShardSpec(1))).toString()), "dummy");
+    DataSegment segment = createSegment(new Path(taskDirPath, 
"index_conflict.zip").toString(),
+            new Interval(100, 150), "v1", new LinearShardSpec(1));
+    Path segmentPath = new Path(dataSegmentPusher.getPathForHadoop(), 
dataSegmentPusher.makeIndexPathName(segment, 
DruidStorageHandlerUtils.INDEX_ZIP));
+    FileUtils.writeStringToFile(new File(segmentPath.toUri()), "dummy");
+
     druidStorageHandler.commitInsertTable(tableMock, false);
     Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), 
Lists.newArrayList(
             DruidStorageHandlerUtils.getAllDataSourceNames(connector,
@@ -411,13 +485,14 @@ public class TestDruidStorageHandler {
     Assert.assertTrue(persistedSegment.getShardSpec() instanceof 
LinearShardSpec);
     // insert into should skip and increment partition number to 3
     Assert.assertEquals(2, persistedSegment.getShardSpec().getPartitionNum());
-    String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment(
-            
config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), 
persistedSegment)
-            .toString();
-    Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", 
expectedFinalPath),
+    Path expectedFinalHadoopPath =  new 
Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher
+            .makeIndexPathName(persistedSegment, 
DruidStorageHandlerUtils.INDEX_ZIP));
+
+
+    Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", 
expectedFinalHadoopPath.toString()),
             persistedSegment.getLoadSpec());
     Assert.assertEquals("dummySegmentData",
-            FileUtils.readFileToString(new File(expectedFinalPath)));
+            FileUtils.readFileToString(new 
File(expectedFinalHadoopPath.toUri())));
   }
 
   @Test(expected = IllegalStateException.class)
@@ -439,16 +514,20 @@ public class TestDruidStorageHandler {
             createSegment(new Path(taskDirPath, "index_old_3.zip").toString(),
                     new Interval(200, 300),
                     "v0", new LinearShardSpec(0)));
+    HdfsDataSegmentPusherConfig pusherConfig = new 
HdfsDataSegmentPusherConfig();
+    pusherConfig.setStorageDirectory(taskDirPath.toString());
+    DataSegmentPusher dataSegmentPusher = new 
HdfsDataSegmentPusher(pusherConfig, config, 
DruidStorageHandlerUtils.JSON_MAPPER);
     DruidStorageHandlerUtils
             .publishSegments(connector, metadataStorageTablesConfig, 
DATA_SOURCE_NAME,
                     existingSegments,
                     true,
                     taskDirPath.toString(),
-                    config
+                    config,
+                    dataSegmentPusher
             );
 
     // Try appending segment with conflicting interval
-    DataSegment conflictingSegment = createSegment(new Path(taskDirPath, 
"index.zip").toString(),
+    DataSegment conflictingSegment = createSegment(new Path(taskDirPath, 
DruidStorageHandlerUtils.INDEX_ZIP).toString(),
             new Interval(100, 300), "v1", new LinearShardSpec(0));
     Path descriptorPath = DruidStorageHandlerUtils
             .makeSegmentDescriptorOutputPath(conflictingSegment,
@@ -474,16 +553,20 @@ public class TestDruidStorageHandler {
                             new Interval(200, 250), "v0", new 
LinearShardSpec(0)),
                     createSegment(new Path(taskDirPath, 
"index_old_3.zip").toString(),
                             new Interval(250, 300), "v0", new 
LinearShardSpec(0)));
+    HdfsDataSegmentPusherConfig pusherConfig = new 
HdfsDataSegmentPusherConfig();
+    pusherConfig.setStorageDirectory(taskDirPath.toString());
+    DataSegmentPusher dataSegmentPusher = new 
HdfsDataSegmentPusher(pusherConfig, config, 
DruidStorageHandlerUtils.JSON_MAPPER);
     DruidStorageHandlerUtils
             .publishSegments(connector, metadataStorageTablesConfig, 
DATA_SOURCE_NAME,
                     existingSegments,
                     true,
                     taskDirPath.toString(),
-                    config
+                    config,
+                    dataSegmentPusher
             );
 
     // Try appending to non extendable shard spec
-    DataSegment conflictingSegment = createSegment(new Path(taskDirPath, 
"index.zip").toString(),
+    DataSegment conflictingSegment = createSegment(new Path(taskDirPath, 
DruidStorageHandlerUtils.INDEX_ZIP).toString(),
             new Interval(100, 150), "v1", new LinearShardSpec(0));
     Path descriptorPath = DruidStorageHandlerUtils
             .makeSegmentDescriptorOutputPath(conflictingSegment,

Reply via email to