This is an automated email from the ASF dual-hosted git repository. jihoonson pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 4e23c11 Make IngestSegmentFirehoseFactory splittable for parallel ingestion (#7048) 4e23c11 is described below commit 4e23c11345de694c997881d72ca6c668b3fb795b Author: David Glasser <glas...@apollographql.com> AuthorDate: Tue Apr 2 14:59:17 2019 -0700 Make IngestSegmentFirehoseFactory splittable for parallel ingestion (#7048) * Make IngestSegmentFirehoseFactory splittable for parallel ingestion * Code review feedback - Get rid of WindowedSegment - Don't document 'segments' parameter or support splitting firehoses that use it - Require 'intervals' in WindowedSegmentId (since it won't be written by hand) * Add missing @JsonProperty * Integration test passes * Add unit test * Remove two FIXME comments from CompactionTask I'd like to leave this PR in a potentially mergeable state, but I still would appreciate reviewer eyes on the questions I'm removing here. * Updates from code review --- docs/content/ingestion/firehose.md | 4 +- docs/content/ingestion/native_tasks.md | 2 +- .../druid/indexing/common/task/CompactionTask.java | 2 + .../firehose/IngestSegmentFirehoseFactory.java | 293 ++++++++++++++++++--- .../druid/indexing/firehose/WindowedSegmentId.java | 61 +++++ .../firehose/IngestSegmentFirehoseFactoryTest.java | 2 + .../IngestSegmentFirehoseFactoryTimelineTest.java | 67 ++++- .../druid/tests/indexer/ITParallelIndexTest.java | 14 +- ...kipedia_parallel_ingest_segment_index_task.json | 63 +++++ .../client/coordinator/CoordinatorClient.java | 32 +++ 10 files changed, 495 insertions(+), 45 deletions(-) diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index b451c25..8cfdc72 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -87,7 +87,8 @@ The below configurations can be optionally used for tuning the firehose performa ### IngestSegmentFirehose This Firehose can be used to read the data from existing druid segments. -It can be used ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment. +It can be used to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment. +This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task). A sample ingest firehose spec is shown below - ```json @@ -106,6 +107,7 @@ A sample ingest firehose spec is shown below - |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](../querying/filters.html)|no| +|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| #### SqlFirehose diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index f61b395..adadcf3 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -45,7 +45,7 @@ task statuses. If one of them fails, it retries the failed task until the retryi If all worker tasks succeed, then it collects the reported list of generated segments and publishes those segments at once. To use this task, the `firehose` in `ioConfig` should be _splittable_. If it's not, this task runs sequentially. The -current splittable fireshoses are [`LocalFirehose`](./firehose.html#localfirehose), [`HttpFirehose`](./firehose.html#httpfirehose) +current splittable fireshoses are [`LocalFirehose`](./firehose.html#localfirehose), [`IngestSegmentFirehose`](./firehose.html#ingestsegmentfirehose), [`HttpFirehose`](./firehose.html#httpfirehose) , [`StaticS3Firehose`](../development/extensions-core/s3.html#statics3firehose), [`StaticAzureBlobStoreFirehose`](../development/extensions-contrib/azure.html#staticazureblobstorefirehose) , [`StaticGoogleBlobStoreFirehose`](../development/extensions-contrib/google.html#staticgoogleblobstorefirehose), and [`StaticCloudFilesFirehose`](../development/extensions-contrib/cloudfiles.html#staticcloudfilesfirehose). diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 53c3641..7608f88 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -506,10 +506,12 @@ public class CompactionTask extends AbstractTask new IngestSegmentFirehoseFactory( dataSchema.getDataSource(), interval, + null, null, // no filter // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), + null, toolbox.getIndexIO(), coordinatorClient, segmentLoaderFactory, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 5ccd9d3..4d1e0bf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -31,12 +31,16 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.RetryPolicy; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.filter.DimFilter; @@ -51,40 +55,60 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; -public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowParser> +public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> { private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); + private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; private final String dataSource; + // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly + // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel + // batch ingestion. + @Nullable private final Interval interval; + @Nullable + private final List<WindowedSegmentId> segmentIds; private final DimFilter dimFilter; private final List<String> dimensions; private final List<String> metrics; + private final long maxInputSegmentBytesPerTask; private final IndexIO indexIO; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; private final RetryPolicyFactory retryPolicyFactory; + private List<InputSplit<List<WindowedSegmentId>>> splits; + @JsonCreator public IngestSegmentFirehoseFactory( @JsonProperty("dataSource") final String dataSource, - @JsonProperty("interval") Interval interval, + @Nullable @JsonProperty("interval") Interval interval, + // Specifying "segments" is intended only for when this FirehoseFactory has split itself, + // not for direct end user use. + @Nullable @JsonProperty("segments") List<WindowedSegmentId> segmentIds, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("dimensions") List<String> dimensions, @JsonProperty("metrics") List<String> metrics, + @JsonProperty("maxInputSegmentBytesPerTask") Long maxInputSegmentBytesPerTask, @JacksonInject IndexIO indexIO, @JacksonInject CoordinatorClient coordinatorClient, @JacksonInject SegmentLoaderFactory segmentLoaderFactory, @@ -92,18 +116,42 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar ) { Preconditions.checkNotNull(dataSource, "dataSource"); - Preconditions.checkNotNull(interval, "interval"); + if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) { + throw new IAE("Specify exactly one of 'interval' and 'segments'"); + } this.dataSource = dataSource; this.interval = interval; + this.segmentIds = segmentIds; this.dimFilter = dimFilter; this.dimensions = dimensions; this.metrics = metrics; + this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null + ? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK + : maxInputSegmentBytesPerTask; this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); } + @Override + public FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> withSplit(InputSplit<List<WindowedSegmentId>> split) + { + return new IngestSegmentFirehoseFactory( + dataSource, + null, + split.get(), + dimFilter, + dimensions, + metrics, + maxInputSegmentBytesPerTask, + indexIO, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ); + } + @JsonProperty public String getDataSource() { @@ -116,6 +164,12 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar return interval; } + @JsonProperty + public List<WindowedSegmentId> getSegments() + { + return segmentIds; + } + @JsonProperty("filter") public DimFilter getDimensionsFilter() { @@ -134,50 +188,40 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar return metrics; } + @JsonProperty + public long getMaxInputSegmentBytesPerTask() + { + return maxInputSegmentBytesPerTask; + } + @Override public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws ParseException { - log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval); + log.info( + "Connecting firehose: dataSource[%s], interval[%s], segmentIds[%s]", + dataSource, + interval, + segmentIds + ); try { - // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration - // as TaskActionClient. - final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); - List<DataSegment> usedSegments; - while (true) { - try { - usedSegments = - coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); - break; - } - catch (Throwable e) { - log.warn(e, "Exception getting database segments"); - final Duration delay = retryPolicy.getAndIncrementRetryDelay(); - if (delay == null) { - throw e; - } else { - final long sleepTime = jitter(delay.getMillis()); - log.info("Will try again in [%s].", new Duration(sleepTime).toString()); - try { - Thread.sleep(sleepTime); - } - catch (InterruptedException e2) { - throw new RuntimeException(e2); - } - } - } - } + final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline(); + // Download all segments locally. + // Note: this requires enough local storage space to fit all of the segments, even though + // IngestSegmentFirehose iterates over the segments in series. We may want to change this + // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory. final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap(); - for (DataSegment segment : usedSegments) { - segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); + for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) { + for (PartitionChunk<DataSegment> chunk : holder.getObject()) { + final DataSegment segment = chunk.getObject(); + if (!segmentFileMap.containsKey(segment)) { + segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); + } + } } - final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = VersionedIntervalTimeline - .forSegments(usedSegments) - .lookup(interval); - final List<String> dims; if (dimensions != null) { dims = dimensions; @@ -250,6 +294,179 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar return retval < 0 ? 0 : retval; } + private List<TimelineObjectHolder<String, DataSegment>> getTimeline() + { + if (interval == null) { + return getTimelineForSegmentIds(); + } else { + return getTimelineForInterval(); + } + } + + private List<TimelineObjectHolder<String, DataSegment>> getTimelineForInterval() + { + Preconditions.checkNotNull(interval); + + // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration + // as TaskActionClient. + final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); + List<DataSegment> usedSegments; + while (true) { + try { + usedSegments = + coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval)); + break; + } + catch (Throwable e) { + log.warn(e, "Exception getting database segments"); + final Duration delay = retryPolicy.getAndIncrementRetryDelay(); + if (delay == null) { + throw e; + } else { + final long sleepTime = jitter(delay.getMillis()); + log.info("Will try again in [%s].", new Duration(sleepTime).toString()); + try { + Thread.sleep(sleepTime); + } + catch (InterruptedException e2) { + throw new RuntimeException(e2); + } + } + } + } + + return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval); + } + + private List<TimelineObjectHolder<String, DataSegment>> getTimelineForSegmentIds() + { + final SortedMap<Interval, TimelineObjectHolder<String, DataSegment>> timeline = new TreeMap<>( + Comparators.intervalsByStartThenEnd() + ); + for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds)) { + final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment( + dataSource, + windowedSegmentId.getSegmentId() + ); + for (Interval interval : windowedSegmentId.getIntervals()) { + final TimelineObjectHolder<String, DataSegment> existingHolder = timeline.get(interval); + if (existingHolder != null) { + if (!existingHolder.getVersion().equals(segment.getVersion())) { + throw new ISE("Timeline segments with the same interval should have the same version: " + + "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment); + } + existingHolder.getObject().add(segment.getShardSpec().createChunk(segment)); + } else { + timeline.put(interval, new TimelineObjectHolder<>( + interval, + segment.getInterval(), + segment.getVersion(), + new PartitionHolder<DataSegment>(segment.getShardSpec().createChunk(segment)) + )); + } + } + } + + // Validate that none of the given windows overlaps (except for when multiple segments share exactly the + // same interval). + Interval lastInterval = null; + for (Interval interval : timeline.keySet()) { + if (lastInterval != null) { + if (interval.overlaps(lastInterval)) { + throw new IAE( + "Distinct intervals in input segments may not overlap: [%s] vs [%s]", + lastInterval, + interval + ); + } + } + lastInterval = interval; + } + + return new ArrayList<>(timeline.values()); + } + + private void initializeSplitsIfNeeded() + { + if (splits != null) { + return; + } + + // isSplittable() ensures this is only called when we have an interval. + final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = getTimelineForInterval(); + + // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing + // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their + // data can combine with each other anyway. + + List<InputSplit<List<WindowedSegmentId>>> newSplits = new ArrayList<>(); + List<WindowedSegmentId> currentSplit = new ArrayList<>(); + Map<DataSegment, WindowedSegmentId> windowedSegmentIds = new HashMap<>(); + long bytesInCurrentSplit = 0; + for (TimelineObjectHolder<String, DataSegment> timelineHolder : timelineSegments) { + for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) { + final DataSegment segment = chunk.getObject(); + final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment); + if (existingWindowedSegmentId != null) { + // We've already seen this segment in the timeline, so just add this interval to it. It has already + // been placed into a split. + existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval()); + } else { + // It's the first time we've seen this segment, so create a new WindowedSegmentId. + List<Interval> intervals = new ArrayList<>(); + // Use the interval that contributes to the timeline, not the entire segment's true interval. + intervals.add(timelineHolder.getInterval()); + final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals); + windowedSegmentIds.put(segment, newWindowedSegmentId); + + // Now figure out if it goes in the current split or not. + final long segmentBytes = segment.getSize(); + if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) { + // This segment won't fit in the current non-empty split, so this split is done. + newSplits.add(new InputSplit<>(currentSplit)); + currentSplit = new ArrayList<>(); + bytesInCurrentSplit = 0; + } + if (segmentBytes > maxInputSegmentBytesPerTask) { + // If this segment is itself bigger than our max, just put it in its own split. + Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0); + newSplits.add(new InputSplit<>(Collections.singletonList(newWindowedSegmentId))); + } else { + currentSplit.add(newWindowedSegmentId); + bytesInCurrentSplit += segmentBytes; + } + } + } + } + if (!currentSplit.isEmpty()) { + newSplits.add(new InputSplit<>(currentSplit)); + } + + splits = newSplits; + } + + @Override + public boolean isSplittable() + { + // Specifying 'segments' to this factory instead of 'interval' is intended primarily for internal use by + // parallel batch injection: we don't need to support splitting a list of segments. + return interval != null; + } + + @Override + public Stream<InputSplit<List<WindowedSegmentId>>> getSplits() + { + initializeSplitsIfNeeded(); + return splits.stream(); + } + + @Override + public int getNumSplits() + { + initializeSplitsIfNeeded(); + return splits.size(); + } + @VisibleForTesting static List<String> getUniqueDimensions( List<TimelineObjectHolder<String, DataSegment>> timelineSegments, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java new file mode 100644 index 0000000..c3f04bb --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.joda.time.Interval; + +import java.util.List; + +/** + * A WindowedSegment represents a segment plus the list of intervals inside it which contribute to a timeline. + * <p> + * This class is intended for serialization in specs. + */ +public class WindowedSegmentId +{ + // This is of the form used by SegmentId. + private final String segmentId; + private final List<Interval> intervals; + + @JsonCreator + public WindowedSegmentId( + @JsonProperty("segmentId") String segmentId, + @JsonProperty("intervals") List<Interval> intervals + ) + { + this.segmentId = Preconditions.checkNotNull(segmentId, "null segmentId"); + this.intervals = Preconditions.checkNotNull(intervals, "null intervals"); + } + + @JsonProperty + public String getSegmentId() + { + return segmentId; + } + + @JsonProperty + public List<Interval> getIntervals() + { + return intervals; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 7f44ad6..70e5544 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -197,9 +197,11 @@ public class IngestSegmentFirehoseFactoryTest final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory( TASK.getDataSource(), Intervals.ETERNITY, + null, new SelectorDimFilter(DIM_NAME, DIM_VALUE, null), dim_names, metric_names, + null, INDEX_IO, cc, slf, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index e224b95..119097b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -26,8 +26,10 @@ import com.google.common.collect.Iterables; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; @@ -72,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; @RunWith(Parameterized.class) public class IngestSegmentFirehoseFactoryTimelineTest @@ -101,6 +104,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest private final File tmpDir; private final int expectedCount; private final long expectedSum; + private final int segmentCount; private static final ObjectMapper MAPPER; private static final IndexIO INDEX_IO; @@ -118,17 +122,28 @@ public class IngestSegmentFirehoseFactoryTimelineTest IngestSegmentFirehoseFactory factory, File tmpDir, int expectedCount, - long expectedSum + long expectedSum, + int segmentCount ) { this.factory = factory; this.tmpDir = tmpDir; this.expectedCount = expectedCount; this.expectedSum = expectedSum; + this.segmentCount = segmentCount; } @Test - public void testSimple() throws Exception + public void test() throws Exception + { + // Junit 4.12 doesn't have a good way to run tearDown after multiple tests in a Parameterized + // class run. (Junit 4.13 adds @AfterParam but isn't released yet.) Fake it by just running + // "tests" in series inside one @Test. + testSimple(); + testSplit(); + } + + private void testSimple() throws Exception { int count = 0; long sum = 0; @@ -145,6 +160,36 @@ public class IngestSegmentFirehoseFactoryTimelineTest Assert.assertEquals("sum", expectedSum, sum); } + private void testSplit() throws Exception + { + Assert.assertTrue(factory.isSplittable()); + final int numSplits = factory.getNumSplits(); + // We set maxInputSegmentBytesPerSplit to 2 so each segment should become a byte. + Assert.assertEquals(segmentCount, numSplits); + final List<InputSplit<List<WindowedSegmentId>>> splits = + factory.getSplits().collect(Collectors.toList()); + Assert.assertEquals(numSplits, splits.size()); + + int count = 0; + long sum = 0; + + for (InputSplit<List<WindowedSegmentId>> split : splits) { + final FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> splitFactory = + factory.withSplit(split); + try (final Firehose firehose = splitFactory.connect(ROW_PARSER, null)) { + while (firehose.hasMore()) { + final InputRow row = firehose.nextRow(); + count++; + sum += row.getMetric(METRICS[0]).longValue(); + } + } + } + + Assert.assertEquals("count", expectedCount, count); + Assert.assertEquals("sum", expectedSum, sum); + + } + @After public void tearDown() throws Exception { @@ -285,13 +330,26 @@ public class IngestSegmentFirehoseFactoryTimelineTest throw new IllegalArgumentException("WTF"); } } + + @Override + public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId) + { + return testCase.segments + .stream() + .filter(s -> s.getId().toString().equals(segmentId)) + .findAny() + .get(); // throwing if not found is exactly what the real code does + } }; final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( DATA_SOURCE, testCase.interval, + null, new TrueDimFilter(), Arrays.asList(DIMENSIONS), Arrays.asList(METRICS), + // Split as much as possible + 1L, INDEX_IO, cc, slf, @@ -304,7 +362,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest factory, testCase.tmpDir, testCase.expectedCount, - testCase.expectedSum + testCase.expectedSum, + testCase.segments.size() } ); } @@ -384,7 +443,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest Arrays.asList(METRICS), new LinearShardSpec(partitionNum), -1, - 0L + 2L ); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java index e457a1c..b3920a1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java @@ -33,11 +33,16 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest private static String REINDEX_TASK = "/indexer/wikipedia_parallel_reindex_task.json"; private static String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_reindex_queries.json"; private static String INDEX_DATASOURCE = "wikipedia_parallel_index_test"; + private static String INDEX_INGEST_SEGMENT_DATASOURCE = "wikipedia_parallel_ingest_segment_index_test"; + private static String INDEX_INGEST_SEGMENT_TASK = "/indexer/wikipedia_parallel_ingest_segment_index_task.json"; @Test public void testIndexData() throws Exception { - try (final Closeable closeable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())) { + try (final Closeable indexCloseable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable ingestSegmentCloseable = unloader( + INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix()); + ) { doIndexTestTest( INDEX_DATASOURCE, INDEX_TASK, @@ -53,6 +58,13 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest REINDEX_QUERIES_RESOURCE, true ); + + doReindexTest( + INDEX_DATASOURCE, + INDEX_INGEST_SEGMENT_DATASOURCE, + INDEX_INGEST_SEGMENT_TASK, + REINDEX_QUERIES_RESOURCE + ); } } } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json new file mode 100644 index 0000000..535e859 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json @@ -0,0 +1,63 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "%%REINDEX_DATASOURCE%%", + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals": [ + "2013-08-31/2013-09-02" + ] + }, + "parser": { + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensionExclusions": [ + "robot", + "continent" + ] + } + } + } + }, + "ioConfig": { + "type": "index_parallel", + "firehose": { + "type": "ingestSegment", + "dataSource": "%%DATASOURCE%%", + "interval": "2013-08-31/2013-09-02", + "maxInputSegmentBytesPerTask": 1 + } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumSubTasks": 10 + } + } +} diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 4188540..247dee1 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -156,4 +156,36 @@ public class CoordinatorClient throw new RuntimeException(e); } } + + public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId) + { + try { + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest( + HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/metadata/datasources/%s/segments/%s", + StringUtils.urlEncode(dataSource), + StringUtils.urlEncode(segmentId) + ) + ) + ); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching database segment data source segment status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue( + response.getContent(), new TypeReference<DataSegment>() + { + } + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org