HIVE-17523 : Insert into druid table hangs Hive server2 in an infinite loop (Slim Bouguerra via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/365c0310 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/365c0310 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/365c0310 Branch: refs/heads/hive-14535 Commit: 365c031077ef5d99f623634fe8516e52d5dd7b02 Parents: f6b4f9e Author: Slim Bouguerra <slim.bougue...@gmail.com> Authored: Tue Sep 12 18:14:00 2017 -0700 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Thu Sep 28 08:07:01 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/druid/DruidStorageHandler.java | 59 ++- .../hive/druid/DruidStorageHandlerUtils.java | 471 +++++++++---------- .../hive/druid/TestDruidStorageHandler.java | 153 ++++-- 3 files changed, 373 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/365c0310/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index da6d493..62e146d 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hive.druid; +import com.google.common.collect.Lists; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; import io.druid.metadata.storage.postgresql.PostgreSQLConnector; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.storage.hdfs.HdfsDataSegmentPusher; +import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; import org.apache.commons.lang3.StringUtils; @@ -74,6 +78,7 @@ import com.metamx.http.client.HttpClientInit; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -281,17 +286,34 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor if (MetaStoreUtils.isExternalTable(table)) { return; } - LOG.info("Committing table {} to the druid metastore", table.getDbName()); + final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); + final List<DataSegment> segmentList = Lists.newArrayList(); final Path tableDir = getSegmentDescriptorDir(); + console.logInfo(String.format("Committing hive table {} druid data source {} to the druid metadata store", + table.getTableName(), dataSourceName + )); + try { + segmentList.addAll(DruidStorageHandlerUtils.getPublishedSegments(tableDir, getConf())); + } catch (IOException e) { + LOG.error("Failed to load segments descriptor from directory {}", tableDir.toString()); + Throwables.propagate(e); + cleanWorkingDir(); + } try { - List<DataSegment> segmentList = DruidStorageHandlerUtils - .getPublishedSegments(tableDir, getConf()); - LOG.info("Found {} segments under path {}", segmentList.size(), tableDir); - final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); final String segmentDirectory = table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); + LOG.info( + String.format("Will move [%s] druid segments from [%s] to [%s]", + segmentList.size(), + getStagingWorkingDir(), + segmentDirectory + + )); + HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); + pusherConfig.setStorageDirectory(segmentDirectory); + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, getConf(), DruidStorageHandlerUtils.JSON_MAPPER); DruidStorageHandlerUtils.publishSegments( connector, druidMetadataStorageTablesConfig, @@ -299,13 +321,26 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor segmentList, overwrite, segmentDirectory, - getConf() + getConf(), + dataSegmentPusher ); + } catch (CallbackFailedException | IOException e ) { + LOG.error("Failed to publish segments"); + if (e instanceof CallbackFailedException) { + Throwables.propagate(e.getCause()); + } + Throwables.propagate(e); + } finally { + cleanWorkingDir(); + } final String coordinatorAddress = HiveConf .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS); int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); - LOG.info("checking load status from coordinator {}", coordinatorAddress); + if (maxTries == 0) { + return; + } + LOG.debug("checking load status from coordinator {}", coordinatorAddress); String coordinatorResponse = null; try { @@ -314,12 +349,12 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor ), input -> input instanceof IOException, maxTries); } catch (Exception e) { console.printInfo( - "Will skip waiting for data loading"); + "Will skip waiting for data loading, coordinator unavailable"); return; } if (Strings.isNullOrEmpty(coordinatorResponse)) { console.printInfo( - "Will skip waiting for data loading"); + "Will skip waiting for data loading empty response from coordinator"); return; } console.printInfo( @@ -380,12 +415,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor setOfUrls.size(), segmentList.size() )); } - } catch (IOException e) { - LOG.error("Exception while commit", e); - Throwables.propagate(e); - } finally { - cleanWorkingDir(); - } } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hive/blob/365c0310/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 7169140..4852ff1 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -41,7 +41,6 @@ import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.ShardSpec; -import org.apache.calcite.adapter.druid.LocalInterval; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -56,7 +55,6 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -79,7 +77,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; -import org.joda.time.format.ISODateTimeFormat; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -114,7 +111,6 @@ import java.util.TimeZone; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; /** * Utils class for Druid storage handler. @@ -123,13 +119,19 @@ public final class DruidStorageHandlerUtils { private static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandlerUtils.class); + private static final int NUM_RETRIES = 8; + private static final int SECONDS_BETWEEN_RETRIES = 2; + private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB + private static final int DEFAULT_STREAMING_RESULT_SIZE = 100; private static final String SMILE_CONTENT_TYPE = "application/x-jackson-smile"; public static final String DEFAULT_TIMESTAMP_COLUMN = "__time"; - + public static final String INDEX_ZIP = "index.zip"; + public static final String DESCRIPTOR_JSON = "descriptor.json"; public static final Interval DEFAULT_INTERVAL = new Interval( new DateTime("1900-01-01", ISOChronology.getInstanceUTC()), new DateTime("3000-01-01", ISOChronology.getInstanceUTC()) ).withChronology(ISOChronology.getInstanceUTC()); + /** * Mapper to use to serialize/deserialize Druid objects (JSON) */ @@ -140,8 +142,7 @@ public final class DruidStorageHandlerUtils { */ public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory()); - static - { + static { // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig InjectableValues.Std injectableValues = new InjectableValues.Std() .addValue(SelectQueryConfig.class, new SelectQueryConfig(false)) @@ -151,16 +152,22 @@ public final class DruidStorageHandlerUtils { HiveDruidSerializationModule hiveDruidSerializationModule = new HiveDruidSerializationModule(); JSON_MAPPER.registerModule(hiveDruidSerializationModule); SMILE_MAPPER.registerModule(hiveDruidSerializationModule); + // Register the shard sub type to be used by the mapper + JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear")); + // set the timezone of the object mapper + // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC" + JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC")); + try { + // No operation emitter will be used by some internal druid classes. + EmittingLogger.registerEmitter( + new ServiceEmitter("druid-hive-indexer", InetAddress.getLocalHost().getHostName(), + new NoopEmitter() + )); + } catch (UnknownHostException e) { + throw Throwables.propagate(e); + } } - private static final int NUM_RETRIES = 8; - - private static final int SECONDS_BETWEEN_RETRIES = 2; - - private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB - - private static final int DEFAULT_STREAMING_RESULT_SIZE = 100; - /** * Used by druid to perform IO on indexes */ @@ -183,23 +190,6 @@ public final class DruidStorageHandlerUtils { */ public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); - static { - // Register the shard sub type to be used by the mapper - JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear")); - // set the timezone of the object mapper - // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC" - JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC")); - try { - // No operation emitter will be used by some internal druid classes. - EmittingLogger.registerEmitter( - new ServiceEmitter("druid-hive-indexer", InetAddress.getLocalHost().getHostName(), - new NoopEmitter() - )); - } catch (UnknownHostException e) { - throw Throwables.propagate(e); - } - } - /** * Method that creates a request for Druid JSON query (using SMILE). * @@ -287,29 +277,26 @@ public final class DruidStorageHandlerUtils { ) throws IOException { final DataPusher descriptorPusher = (DataPusher) RetryProxy.create( - DataPusher.class, new DataPusher() { - @Override - public long push() throws IOException { - try { - if (outputFS.exists(descriptorPath)) { - if (!outputFS.delete(descriptorPath, false)) { - throw new IOException( - String.format("Failed to delete descriptor at [%s]", descriptorPath)); - } - } - try (final OutputStream descriptorOut = outputFS.create( - descriptorPath, - true, - DEFAULT_FS_BUFFER_SIZE - )) { - JSON_MAPPER.writeValue(descriptorOut, segment); - descriptorOut.flush(); + DataPusher.class, () -> { + try { + if (outputFS.exists(descriptorPath)) { + if (!outputFS.delete(descriptorPath, false)) { + throw new IOException( + String.format("Failed to delete descriptor at [%s]", descriptorPath)); } - } catch (RuntimeException | IOException ex) { - throw ex; } - return -1; + try (final OutputStream descriptorOut = outputFS.create( + descriptorPath, + true, + DEFAULT_FS_BUFFER_SIZE + )) { + JSON_MAPPER.writeValue(descriptorOut, segment); + descriptorOut.flush(); + } + } catch (RuntimeException | IOException ex) { + throw ex; } + return -1; }, RetryPolicies .exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS) @@ -327,31 +314,25 @@ public final class DruidStorageHandlerUtils { final MetadataStorageTablesConfig metadataStorageTablesConfig ) { return connector.getDBI().withHandle( - new HandleCallback<List<String>>() { - @Override - public List<String> withHandle(Handle handle) throws Exception { - return handle.createQuery( - String.format("SELECT DISTINCT(datasource) FROM %s WHERE used = true", - metadataStorageTablesConfig.getSegmentsTable() - )) - .fold(Lists.<String>newArrayList(), - new Folder3<ArrayList<String>, Map<String, Object>>() { - @Override - public ArrayList<String> fold(ArrayList<String> druidDataSources, - Map<String, Object> stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException { - druidDataSources.add( - MapUtils.getString(stringObjectMap, "datasource") - ); - return druidDataSources; - } - } - ); - - } - } + (HandleCallback<List<String>>) handle -> handle.createQuery( + String.format("SELECT DISTINCT(datasource) FROM %s WHERE used = true", + metadataStorageTablesConfig.getSegmentsTable() + )) + .fold(Lists.<String>newArrayList(), + new Folder3<ArrayList<String>, Map<String, Object>>() { + @Override + public ArrayList<String> fold(ArrayList<String> druidDataSources, + Map<String, Object> stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException { + druidDataSources.add( + MapUtils.getString(stringObjectMap, "datasource") + ); + return druidDataSources; + } + } + ) ); } @@ -372,12 +353,9 @@ public final class DruidStorageHandlerUtils { } connector.getDBI().withHandle( - new HandleCallback<Void>() { - @Override - public Void withHandle(Handle handle) throws Exception { - disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); - return null; - } + (HandleCallback<Void>) handle -> { + disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); + return null; } ); @@ -394,134 +372,131 @@ public final class DruidStorageHandlerUtils { final List<DataSegment> segments, boolean overwrite, String segmentDirectory, - Configuration conf) { - try { - connector.getDBI().inTransaction( - new TransactionCallback<Void>() { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) - throws Exception { - final List<DataSegment> finalSegmentsToPublish = Lists.newArrayList(); - VersionedIntervalTimeline<String, DataSegment> timeline; - if (overwrite) { - disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); - // When overwriting start with empty timeline, as we are overwriting segments with new versions - timeline = new VersionedIntervalTimeline<>( - Ordering.natural() - ); - } else { - // Append Mode - build a timeline of existing segments in metadata storage. - Interval indexedInterval = JodaUtils - .umbrellaInterval(Iterables.transform(segments, - new Function<DataSegment, Interval>() { - @Override - public Interval apply(@Nullable DataSegment input) { - return input.getInterval(); - } - })); - timeline = getTimelineForIntervalWithHandle( - handle, dataSource, indexedInterval, metadataStorageTablesConfig); - } - for (DataSegment segment : segments) { - List<TimelineObjectHolder<String, DataSegment>> existingChunks = timeline - .lookup(segment.getInterval()); - if (existingChunks.size() > 1) { - // Not possible to expand since we have more than one chunk with a single segment. - // This is the case when user wants to append a segment with coarser granularity. - // e.g If metadata storage already has segments for with granularity HOUR and segments to append have DAY granularity. - // Druid shard specs does not support multiple partitions for same interval with different granularity. - throw new IllegalStateException( - String.format( - "Cannot allocate new segment for dataSource[%s], interval[%s], already have [%,d] chunks. Not possible to append new segment.", - dataSource, - segment.getInterval(), - existingChunks.size() - ) - ); - } - // Find out the segment with latest version and maximum partition number - SegmentIdentifier max = null; - final ShardSpec newShardSpec; - final String newVersion; - if (!existingChunks.isEmpty()) { - // Some existing chunk, Find max - TimelineObjectHolder<String, DataSegment> existingHolder = Iterables - .getOnlyElement(existingChunks); - for (PartitionChunk<DataSegment> existing : existingHolder.getObject()) { - if (max == null || - max.getShardSpec().getPartitionNum() < existing.getObject() - .getShardSpec() - .getPartitionNum()) { - max = SegmentIdentifier.fromDataSegment(existing.getObject()); - } - } - } - - if (max == null) { - // No existing shard present in the database, use the current version. - newShardSpec = segment.getShardSpec(); - newVersion = segment.getVersion(); - } else { - // use version of existing max segment to generate new shard spec - newShardSpec = getNextPartitionShardSpec(max.getShardSpec()); - newVersion = max.getVersion(); - } - - DataSegment publishedSegment = publishSegmentWithShardSpec(segment, - newShardSpec, newVersion, - segmentDirectory, getPath(segment).getFileSystem(conf)); - finalSegmentsToPublish.add(publishedSegment); - timeline.add(publishedSegment.getInterval(), publishedSegment.getVersion(), - publishedSegment.getShardSpec().createChunk(publishedSegment)); - - } - - // Publish new segments to metadata storage - final PreparedBatch batch = handle.prepareBatch( + Configuration conf, + DataSegmentPusher dataSegmentPusher + ) throws CallbackFailedException { + connector.getDBI().inTransaction( + (TransactionCallback<Void>) (handle, transactionStatus) -> { + final List<DataSegment> finalSegmentsToPublish = Lists.newArrayList(); + VersionedIntervalTimeline<String, DataSegment> timeline; + if (overwrite) { + disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); + // When overwriting start with empty timeline, as we are overwriting segments with new versions + timeline = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + } else { + // Append Mode - build a timeline of existing segments in metadata storage. + Interval indexedInterval = JodaUtils + .umbrellaInterval(Iterables.transform(segments, + input -> input.getInterval() + )); + LOG.info("Building timeline for umbrella Interval [{}]", indexedInterval); + timeline = getTimelineForIntervalWithHandle( + handle, dataSource, indexedInterval, metadataStorageTablesConfig); + } + for (DataSegment segment : segments) { + List<TimelineObjectHolder<String, DataSegment>> existingChunks = timeline + .lookup(segment.getInterval()); + if (existingChunks.size() > 1) { + // Not possible to expand since we have more than one chunk with a single segment. + // This is the case when user wants to append a segment with coarser granularity. + // e.g If metadata storage already has segments for with granularity HOUR and segments to append have DAY granularity. + // Druid shard specs does not support multiple partitions for same interval with different granularity. + throw new IllegalStateException( String.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - metadataStorageTablesConfig.getSegmentsTable() + "Cannot allocate new segment for dataSource[%s], interval[%s], already have [%,d] chunks. Not possible to append new segment.", + dataSource, + segment.getInterval(), + existingChunks.size() ) - ); - - for (final DataSegment segment : finalSegmentsToPublish) { - - batch.add( - new ImmutableMap.Builder<String, Object>() - .put("id", segment.getIdentifier()) - .put("dataSource", segment.getDataSource()) - .put("created_date", new DateTime().toString()) - .put("start", segment.getInterval().getStart().toString()) - .put("end", segment.getInterval().getEnd().toString()) - .put("partitioned", - (segment.getShardSpec() instanceof NoneShardSpec) ? - false : - true) - .put("version", segment.getVersion()) - .put("used", true) - .put("payload", JSON_MAPPER.writeValueAsBytes(segment)) - .build() - ); - - LOG.info("Published {}", segment.getIdentifier()); - + } + // Find out the segment with latest version and maximum partition number + SegmentIdentifier max = null; + final ShardSpec newShardSpec; + final String newVersion; + if (!existingChunks.isEmpty()) { + // Some existing chunk, Find max + TimelineObjectHolder<String, DataSegment> existingHolder = Iterables + .getOnlyElement(existingChunks); + for (PartitionChunk<DataSegment> existing : existingHolder.getObject()) { + if (max == null || + max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } } - batch.execute(); + } - return null; + if (max == null) { + // No existing shard present in the database, use the current version. + newShardSpec = segment.getShardSpec(); + newVersion = segment.getVersion(); + } else { + // use version of existing max segment to generate new shard spec + newShardSpec = getNextPartitionShardSpec(max.getShardSpec()); + newVersion = max.getVersion(); } + DataSegment publishedSegment = publishSegmentWithShardSpec( + segment, + newShardSpec, + newVersion, + getPath(segment).getFileSystem(conf), + dataSegmentPusher + ); + finalSegmentsToPublish.add(publishedSegment); + timeline.add( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().createChunk(publishedSegment) + ); + } - ); - } catch (CallbackFailedException e) { - LOG.error("Exception while publishing segments", e.getCause()); - throw Throwables.propagate(e.getCause()); - } + + // Publish new segments to metadata storage + final PreparedBatch batch = handle.prepareBatch( + String.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + metadataStorageTablesConfig.getSegmentsTable() + ) + + ); + + for (final DataSegment segment : finalSegmentsToPublish) { + + batch.add( + new ImmutableMap.Builder<String, Object>() + .put("id", segment.getIdentifier()) + .put("dataSource", segment.getDataSource()) + .put("created_date", new DateTime().toString()) + .put("start", segment.getInterval().getStart().toString()) + .put("end", segment.getInterval().getEnd().toString()) + .put("partitioned", + (segment.getShardSpec() instanceof NoneShardSpec) ? + false : + true + ) + .put("version", segment.getVersion()) + .put("used", true) + .put("payload", JSON_MAPPER.writeValueAsBytes(segment)) + .build() + ); + + LOG.info("Published {}", segment.getIdentifier()); + } + batch.execute(); + + return null; + } + ); } public static void disableDataSourceWithHandle(Handle handle, - MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource) { + MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource + ) { handle.createStatement( String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", metadataStorageTablesConfig.getSegmentsTable() @@ -542,44 +517,31 @@ public final class DruidStorageHandlerUtils { final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource ) { List<DataSegment> segmentList = connector.retryTransaction( - new TransactionCallback<List<DataSegment>>() { - @Override - public List<DataSegment> inTransaction( - Handle handle, TransactionStatus status - ) throws Exception { - return handle - .createQuery(String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource", - metadataStorageTablesConfig.getSegmentsTable() - )) - .setFetchSize(getStreamingFetchSize(connector)) - .bind("dataSource", dataSource) - .map(ByteArrayMapper.FIRST) - .fold( - new ArrayList<DataSegment>(), - new Folder3<List<DataSegment>, byte[]>() { - @Override - public List<DataSegment> fold(List<DataSegment> accumulator, - byte[] payload, FoldController control, - StatementContext ctx - ) throws SQLException { - try { - final DataSegment segment = DATA_SEGMENT_INTERNER.intern( - JSON_MAPPER.readValue( - payload, - DataSegment.class - )); - - accumulator.add(segment); - return accumulator; - } catch (Exception e) { - throw new SQLException(e.toString()); - } - } - } - ); - } - } + (handle, status) -> handle + .createQuery(String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource", + metadataStorageTablesConfig.getSegmentsTable() + )) + .setFetchSize(getStreamingFetchSize(connector)) + .bind("dataSource", dataSource) + .map(ByteArrayMapper.FIRST) + .fold( + new ArrayList<>(), + (Folder3<List<DataSegment>, byte[]>) (accumulator, payload, control, ctx) -> { + try { + final DataSegment segment = DATA_SEGMENT_INTERNER.intern( + JSON_MAPPER.readValue( + payload, + DataSegment.class + )); + + accumulator.add(segment); + return accumulator; + } catch (Exception e) { + throw new SQLException(e.toString()); + } + } + ) , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES); return segmentList; } @@ -672,7 +634,8 @@ public final class DruidStorageHandlerUtils { DataSegment.class ); timeline.add(segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(segment)); + segment.getShardSpec().createChunk(segment) + ); } } finally { dbSegments.close(); @@ -689,8 +652,8 @@ public final class DruidStorageHandlerUtils { } public static DataSegment publishSegmentWithShardSpec(DataSegment segment, ShardSpec shardSpec, - String version, String segmentDirectory, FileSystem fs) - throws IOException { + String version, FileSystem fs, DataSegmentPusher dataSegmentPusher + ) throws IOException { boolean retry = true; DataSegment.Builder dataSegmentBuilder = new DataSegment.Builder(segment).version(version); Path finalPath = null; @@ -698,8 +661,9 @@ public final class DruidStorageHandlerUtils { retry = false; dataSegmentBuilder.shardSpec(shardSpec); final Path intermediatePath = getPath(segment); - finalPath = finalPathForSegment(segmentDirectory, dataSegmentBuilder.build()); + finalPath = new Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher + .makeIndexPathName(dataSegmentBuilder.build(), DruidStorageHandlerUtils.INDEX_ZIP)); // Create parent if it does not exist, recreation is not an error fs.mkdirs(finalPath.getParent()); @@ -718,34 +682,21 @@ public final class DruidStorageHandlerUtils { } } DataSegment dataSegment = dataSegmentBuilder - .loadSpec(ImmutableMap.<String, Object>of("type", "hdfs", "path", finalPath.toString())) + .loadSpec(dataSegmentPusher.makeLoadSpec(finalPath.toUri())) .build(); - writeSegmentDescriptor(fs, dataSegment, new Path(finalPath.getParent(), "descriptor.json")); + writeSegmentDescriptor(fs, dataSegment, new Path(finalPath.getParent(), DruidStorageHandlerUtils.DESCRIPTOR_JSON)); return dataSegment; } - public static Path finalPathForSegment(String segmentDirectory, DataSegment segment) { - String path = DataSegmentPusher.JOINER.join( - segment.getDataSource(), - String.format( - "%s_%s", - segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), - segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) - ), - segment.getVersion().replaceAll(":", "_") - ); - - return new Path(String.format("%s/%s/index.zip", segmentDirectory, path)); - } - private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) { if (shardSpec instanceof LinearShardSpec) { return new LinearShardSpec(shardSpec.getPartitionNum() + 1); } else if (shardSpec instanceof NumberedShardSpec) { return new NumberedShardSpec(shardSpec.getPartitionNum(), - ((NumberedShardSpec) shardSpec).getPartitions()); + ((NumberedShardSpec) shardSpec).getPartitions() + ); } else { // Druid only support appending more partitions to Linear and Numbered ShardSpecs. throw new IllegalStateException( http://git-wip-us.apache.org/repos/asf/hive/blob/365c0310/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 25f96b3..56b437d 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -115,6 +115,7 @@ public class TestDruidStorageHandler { config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); config.set(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY), new Path(tableWorkingPath, "finalSegmentDir").toString()); + config.set("hive.druid.maxTries", "0"); druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), derbyConnectorRule.metadataTablesConfigSupplier().get() @@ -245,26 +246,35 @@ public class TestDruidStorageHandler { DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule .metadataTablesConfigSupplier().get(); - druidStorageHandler.preCreateTable(tableMock); LocalFileSystem localFileSystem = FileSystem.getLocal(config); Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); - DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), - new Interval(180, 250), "v1", new LinearShardSpec(0)); - Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, - new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) - ); + HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); + pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY))); + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); + + // This create and publish the segment to be overwritten List<DataSegment> existingSegments = Arrays - .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), + .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), new Interval(100, 150), "v0", new LinearShardSpec(0))); DruidStorageHandlerUtils .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, taskDirPath.toString(), - config + config, + dataSegmentPusher ); + + // This creates and publish new segment + DataSegment dataSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), + new Interval(180, 250), "v1", new LinearShardSpec(0)); + + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitInsertTable(tableMock, true); Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( DruidStorageHandlerUtils.getAllDataSourceNames(connector, @@ -277,13 +287,12 @@ public class TestDruidStorageHandler { DataSegment persistedSegment = Iterables.getOnlyElement(dataSegmentList); Assert.assertEquals(dataSegment, persistedSegment); Assert.assertEquals(dataSegment.getVersion(), persistedSegment.getVersion()); - String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( - config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) - .toString(); - Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + Path expectedFinalHadoopPath = new Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher + .makeIndexPathName(persistedSegment, DruidStorageHandlerUtils.INDEX_ZIP)); + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalHadoopPath.toString()), persistedSegment.getLoadSpec()); Assert.assertEquals("dummySegmentData", - FileUtils.readFileToString(new File(expectedFinalPath))); + FileUtils.readFileToString(new File(expectedFinalHadoopPath.toUri()))); } private List<DataSegment> getUsedSegmentsList(DerbyConnectorTestUtility connector, @@ -325,16 +334,21 @@ public class TestDruidStorageHandler { LocalFileSystem localFileSystem = FileSystem.getLocal(config); Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); List<DataSegment> existingSegments = Arrays - .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), + .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), new Interval(100, 150), "v0", new LinearShardSpec(1))); + HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); + pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY))); + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); + DruidStorageHandlerUtils .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, taskDirPath.toString(), - config + config, + dataSegmentPusher ); - DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + DataSegment dataSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), new Interval(100, 150), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) @@ -355,13 +369,68 @@ public class TestDruidStorageHandler { Assert.assertEquals("v0", persistedSegment.getVersion()); Assert.assertTrue(persistedSegment.getShardSpec() instanceof LinearShardSpec); Assert.assertEquals(2, persistedSegment.getShardSpec().getPartitionNum()); - String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( - config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) - .toString(); - Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + + Path expectedFinalHadoopPath = new Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher + .makeIndexPathName(persistedSegment, DruidStorageHandlerUtils.INDEX_ZIP)); + + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalHadoopPath.toString()), persistedSegment.getLoadSpec()); Assert.assertEquals("dummySegmentData", - FileUtils.readFileToString(new File(expectedFinalPath))); + FileUtils.readFileToString(new File(expectedFinalHadoopPath.toUri()))); + } + + @Test + public void testInsertIntoAppendOneMorePartition() throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); + pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY))); + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); + + List<DataSegment> existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), + new Interval(100, 150), "v0", new LinearShardSpec(0))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config, + dataSegmentPusher + ); + + DataSegment dataSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), + new Interval(100, 150), "v0", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitInsertTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + final List<DataSegment> dataSegmentList = getUsedSegmentsList(connector, + metadataStorageTablesConfig); + Assert.assertEquals(2, dataSegmentList.size()); + + DataSegment persistedSegment = dataSegmentList.get(1); + Assert.assertEquals("v0", persistedSegment.getVersion()); + Assert.assertTrue(persistedSegment.getShardSpec() instanceof LinearShardSpec); + Assert.assertEquals(1, persistedSegment.getShardSpec().getPartitionNum()); + + Path expectedFinalHadoopPath = new Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher + .makeIndexPathName(persistedSegment, DruidStorageHandlerUtils.INDEX_ZIP)); + + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalHadoopPath.toString()), + persistedSegment.getLoadSpec()); + Assert.assertEquals("dummySegmentData", + FileUtils.readFileToString(new File(expectedFinalHadoopPath.toUri()))); } @Test @@ -376,12 +445,16 @@ public class TestDruidStorageHandler { List<DataSegment> existingSegments = Arrays .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), new Interval(100, 150), "v0", new LinearShardSpec(1))); + HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); + pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY))); + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); DruidStorageHandlerUtils .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, taskDirPath.toString(), - config + config, + dataSegmentPusher ); DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), new Interval(100, 150), "v1", new LinearShardSpec(0)); @@ -391,10 +464,11 @@ public class TestDruidStorageHandler { DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); // Create segment file at the destination location with LinearShardSpec(2) - FileUtils.writeStringToFile(new File(DruidStorageHandlerUtils.finalPathForSegment( - config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), - createSegment(new Path(taskDirPath, "index_conflict.zip").toString(), - new Interval(100, 150), "v1", new LinearShardSpec(1))).toString()), "dummy"); + DataSegment segment = createSegment(new Path(taskDirPath, "index_conflict.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(1)); + Path segmentPath = new Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher.makeIndexPathName(segment, DruidStorageHandlerUtils.INDEX_ZIP)); + FileUtils.writeStringToFile(new File(segmentPath.toUri()), "dummy"); + druidStorageHandler.commitInsertTable(tableMock, false); Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( DruidStorageHandlerUtils.getAllDataSourceNames(connector, @@ -411,13 +485,14 @@ public class TestDruidStorageHandler { Assert.assertTrue(persistedSegment.getShardSpec() instanceof LinearShardSpec); // insert into should skip and increment partition number to 3 Assert.assertEquals(2, persistedSegment.getShardSpec().getPartitionNum()); - String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( - config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) - .toString(); - Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + Path expectedFinalHadoopPath = new Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher + .makeIndexPathName(persistedSegment, DruidStorageHandlerUtils.INDEX_ZIP)); + + + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalHadoopPath.toString()), persistedSegment.getLoadSpec()); Assert.assertEquals("dummySegmentData", - FileUtils.readFileToString(new File(expectedFinalPath))); + FileUtils.readFileToString(new File(expectedFinalHadoopPath.toUri()))); } @Test(expected = IllegalStateException.class) @@ -439,16 +514,20 @@ public class TestDruidStorageHandler { createSegment(new Path(taskDirPath, "index_old_3.zip").toString(), new Interval(200, 300), "v0", new LinearShardSpec(0))); + HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); + pusherConfig.setStorageDirectory(taskDirPath.toString()); + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); DruidStorageHandlerUtils .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, taskDirPath.toString(), - config + config, + dataSegmentPusher ); // Try appending segment with conflicting interval - DataSegment conflictingSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + DataSegment conflictingSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), new Interval(100, 300), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils .makeSegmentDescriptorOutputPath(conflictingSegment, @@ -474,16 +553,20 @@ public class TestDruidStorageHandler { new Interval(200, 250), "v0", new LinearShardSpec(0)), createSegment(new Path(taskDirPath, "index_old_3.zip").toString(), new Interval(250, 300), "v0", new LinearShardSpec(0))); + HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); + pusherConfig.setStorageDirectory(taskDirPath.toString()); + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); DruidStorageHandlerUtils .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, taskDirPath.toString(), - config + config, + dataSegmentPusher ); // Try appending to non extendable shard spec - DataSegment conflictingSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + DataSegment conflictingSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), new Interval(100, 150), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils .makeSegmentDescriptorOutputPath(conflictingSegment,