ccaominh commented on a change in pull request #8925: Parallel indexing single 
dim partitions
URL: https://github.com/apache/incubator-druid/pull/8925#discussion_r352432840
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
 ##########
 @@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.hash.BloomFilter;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.indexing.common.task.TaskResource;
+import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
+import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
+import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.TimeDimTuple;
+import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.TimeDimTupleFactory;
+import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.TimeDimTupleFunnel;
+import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
+import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * The worker task of {@link 
PartialDimensionDistributionParallelIndexTaskRunner}. This task
+ * determines the distribution of dimension values of input data.
+ */
+
+public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_distribution";
+  private static final Logger LOG = new 
Logger(PartialDimensionDistributionTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> 
taskClientFactory;
+
+  // For testing
+  private final Supplier<UngroupedRowDimensionValueFilter> 
ungroupedRowDimValueFilterSupplier;
+
+  @JsonCreator
+  PartialDimensionDistributionTask(
+      // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based 
counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject IndexingServiceClient indexingServiceClient,
+      @JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> 
taskClientFactory
+  )
+  {
+    this(
+        id,
+        groupId,
+        taskResource,
+        supervisorTaskId,
+        numAttempts,
+        ingestionSchema,
+        context,
+        indexingServiceClient,
+        taskClientFactory,
+        () -> new UngroupedRowDimensionValueFilter(
+            
ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity()
+        )
+    );
+  }
+
+  @VisibleForTesting  // Only for testing
+  PartialDimensionDistributionTask(
+      @Nullable String id,
+      final String groupId,
+      final TaskResource taskResource,
+      final String supervisorTaskId,
+      final int numAttempts,
+      final ParallelIndexIngestionSpec ingestionSchema,
+      final Map<String, Object> context,
+      IndexingServiceClient indexingServiceClient,
+      IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> 
taskClientFactory,
+      Supplier<UngroupedRowDimensionValueFilter> 
ungroupedRowDimValueFilterSupplier
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof 
SingleDimensionPartitionsSpec,
+        "%s partitionsSpec required",
+        SingleDimensionPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.indexingServiceClient = indexingServiceClient;
+    this.taskClientFactory = taskClientFactory;
+    this.ungroupedRowDimValueFilterSupplier = 
ungroupedRowDimValueFilterSupplier;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    SingleDimensionPartitionsSpec partitionsSpec = 
(SingleDimensionPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec);
+    String partitionDimension = partitionsSpec.getPartitionDimension();
+    Preconditions.checkNotNull(partitionDimension, "partitionDimension 
required");
+    boolean isAssumeGrouped = partitionsSpec.isAssumeGrouped();
+
+    InputSource inputSource = 
ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
+                                      .map(AggregatorFactory::getName)
+                                      .collect(Collectors.toList());
+    InputFormat inputFormat = 
ParallelIndexSupervisorTask.getInputFormat(ingestionSchema);
+    InputSourceReader inputSourceReader = 
dataSchema.getTransformSpec().decorate(
+        inputSource.reader(
+            new InputRowSchema(
+                dataSchema.getTimestampSpec(),
+                dataSchema.getDimensionsSpec(),
+                metricsNames
+            ),
+            inputFormat,
+            null
+        )
+    );
+
+    try (
+        CloseableIterator<InputRow> inputRowIterator = 
inputSourceReader.read();
+        HandlingInputRowIterator iterator = new 
RangePartitionIndexTaskInputRowIteratorBuilder(partitionDimension)
+            .delegate(inputRowIterator)
+            .granularitySpec(granularitySpec)
+            .nullRowRunnable(IndexTaskInputRowIteratorBuilder.NOOP_RUNNABLE)
+            
.absentBucketIntervalConsumer(IndexTaskInputRowIteratorBuilder.NOOP_CONSUMER)
+            .build()
+    ) {
+      Map<Interval, StringDistribution> distribution = determineDistribution(
+          iterator,
+          granularitySpec,
+          partitionDimension,
+          isAssumeGrouped,
+          tuningConfig.isLogParseExceptions(),
+          tuningConfig.getMaxParseExceptions()
+      );
+      sendReport(new DimensionDistributionReport(getId(), distribution));
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, StringDistribution> determineDistribution(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      String partitionDimension,
+      boolean isAssumeGrouped,
+      boolean isLogParseExceptions,
+      long maxParseExceptions
+  )
+  {
+    Map<Interval, StringDistribution> intervalToDistribution = new HashMap<>();
+    DimensionValueFilter dimValueFilter =
+        isAssumeGrouped
+        ? new GroupedRowDimensionValueFilter()
+        : ungroupedRowDimValueFilterSupplier.get();
+
+    long numParseExceptions = 0;
+
+    while (inputRowIterator.hasNext()) {
+      try {
+        InputRow inputRow = inputRowIterator.next();
+        if (inputRow == null) {
+          continue;
+        }
+
+        DateTime timestamp = inputRow.getTimestamp();
+
+        //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns 
rows with present intervals)
+        Interval interval = granularitySpec.bucketInterval(timestamp).get();
+        StringDistribution stringDistribution =
+            intervalToDistribution.computeIfAbsent(interval, k -> new 
StringSketch());
+
+        String dimensionValue = dimValueFilter.accept(
+            interval,
+            timestamp,
+            inputRow.getDimension(partitionDimension).get(0)
+        );
+
+        if (dimensionValue != null) {
+          stringDistribution.put(dimensionValue);
+        }
+      }
+      catch (ParseException e) {
+        if (isLogParseExceptions) {
+          LOG.error(e, "Encountered parse exception:");
+        }
+
+        numParseExceptions++;
+        if (numParseExceptions > maxParseExceptions) {
+          throw new RuntimeException("Max parse exceptions exceeded, 
terminating task...");
+        }
+      }
+    }
+
+    // UngroupedDimValueFilter may not accept the min/max dimensionValue. If 
needed, add the min/max
+    // values to the distributions so they have an accurate min/max.
+    dimValueFilter.getIntervalToMinDimensionValue()
+                  .forEach((interval, min) -> 
intervalToDistribution.get(interval).putIfNewMin(min));
+    dimValueFilter.getIntervalToMaxDimensionValue()
+                  .forEach((interval, max) -> 
intervalToDistribution.get(interval).putIfNewMax(max));
+
+    return intervalToDistribution;
+  }
+
+  private void sendReport(DimensionDistributionReport report)
+  {
+    final ParallelIndexSupervisorTaskClient taskClient = 
taskClientFactory.build(
+        new ClientBasedTaskInfoProvider(indexingServiceClient),
+        getId(),
+        1, // always use a single http thread
+        ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
+        ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
+    );
+    taskClient.report(supervisorTaskId, report);
+  }
+
+  private interface DimensionValueFilter
+  {
+    /**
+     * @return Dimension value if it should be accepted, else null
+     */
+    @Nullable
+    String accept(Interval interval, DateTime timestamp, String dimesionValue);
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to