[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-19 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203823447
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.apache.commons.io.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
 
 Review comment:
   We have a rough plan for that. It's 'Two phase parallel indexing with 
shuffle' in https://github.com/apache/incubator-druid/issues/5543.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-19 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215000
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final TaskToolbox toolbox;
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // subTaskId -> report
+  private final ConcurrentMap segmentsMap = new 
ConcurrentHashMap<>();
+
+  private volatile boolean stopped;
+  private volatile TaskMonitor taskMonitor;
+
+  private int nextSpecId = 0;
+
+  SinglePhaseParallelIndexTaskRunner(
+  TaskToolbox toolbox,
+  String taskId,
+  String groupId,
+  ParallelIndexIngestionSpec ingestionSchema,
+  Map context,
+  IndexingServiceClient indexingServiceClient
+  )
+  {
+this.toolbox = toolbox;
+this.taskId = taskId;
+this.groupId = groupId;
+this.ingestionSchema = ingestionSchema;
+this.context = context;
+this.baseFirehoseFactory = (FiniteFirehoseFactory) 
ingestionSchema.getIOConfig().getFirehoseFactory();
+this.maxNumTasks = ingestionSchema.getTuningConfig().getMaxNumSubTasks();
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+  }
+
+  @Override
+  public TaskState run() throws Exception
+  {
+final Iterator subTaskSpecIterator = 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-19 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203822871
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSupervisorTask.java
 ##
 @@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.common.Counters;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.stats.RowIngestionMetersFactory;
+import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
+import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
+import io.druid.indexing.common.task.ParallelIndexTaskRunner.SubTaskSpecStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+
+/**
+ * ParallelIndexSupervisorTask is capable of running multiple subTasks for 
parallel indexing. This is
+ * applicable if the input {@link FiniteFirehoseFactory} is splittable. While 
this task is running, it can submit
+ * multiple child tasks to overlords. This task succeeds only when all its 
child tasks succeed; otherwise it fails.
+ *
+ * @see ParallelIndexTaskRunner
+ */
+public class ParallelIndexSupervisorTask extends AbstractTask implements 
ChatHandler
+{
+  static final String TYPE = "index_parallel";
+
+  private static final Logger log = new 
Logger(ParallelIndexSupervisorTask.class);
+
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+  private final RowIngestionMetersFactory rowIngestionMetersFactory;
+
+  private final Counters counters = new Counters();
+
+  private volatile ParallelIndexTaskRunner runner;
+
+  // toolbox is initlized when run() is called, and can be used for processing 
HTTP endpoint requests.
+  private volatile TaskToolbox toolbox;
+
+  @JsonCreator
+  public ParallelIndexSupervisorTask(
+  @JsonProperty("id") String id,
+  @JsonProperty("resource") TaskResource taskResource,
+  @JsonProperty("spec") ParallelIndexIngestionSpec 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215047
 
 

 ##
 File path: server/src/main/java/io/druid/client/indexing/ClientQuery.java
 ##
 @@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.indexing;
+
+public interface ClientQuery
 
 Review comment:
   Nice finding! Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215022
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // lock for updating running state
+  private final Object startStopLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+synchronized (startStopLock) {
+  running = true;
+  log.info("Starting taskMonitor");
+  // NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+  // That listener should be able to send the events reported to 
TaskRunner to this TaskMonitor.
+  taskStatusChecker.scheduleAtFixedRate(
+  () -> {
+try {
+  final Iterator> iterator = 
runningTasks.entrySet().iterator();
+ 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215000
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final TaskToolbox toolbox;
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // subTaskId -> report
+  private final ConcurrentMap segmentsMap = new 
ConcurrentHashMap<>();
+
+  private volatile boolean stopped;
+  private volatile TaskMonitor taskMonitor;
+
+  private int nextSpecId = 0;
+
+  SinglePhaseParallelIndexTaskRunner(
+  TaskToolbox toolbox,
+  String taskId,
+  String groupId,
+  ParallelIndexIngestionSpec ingestionSchema,
+  Map context,
+  IndexingServiceClient indexingServiceClient
+  )
+  {
+this.toolbox = toolbox;
+this.taskId = taskId;
+this.groupId = groupId;
+this.ingestionSchema = ingestionSchema;
+this.context = context;
+this.baseFirehoseFactory = (FiniteFirehoseFactory) 
ingestionSchema.getIOConfig().getFirehoseFactory();
+this.maxNumTasks = ingestionSchema.getTuningConfig().getMaxNumSubTasks();
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+  }
+
+  @Override
+  public TaskState run() throws Exception
+  {
+final Iterator subTaskSpecIterator = 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203214991
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final TaskToolbox toolbox;
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // subTaskId -> report
+  private final ConcurrentMap segmentsMap = new 
ConcurrentHashMap<>();
+
+  private volatile boolean stopped;
+  private volatile TaskMonitor taskMonitor;
+
+  private int nextSpecId = 0;
+
+  SinglePhaseParallelIndexTaskRunner(
+  TaskToolbox toolbox,
+  String taskId,
+  String groupId,
+  ParallelIndexIngestionSpec ingestionSchema,
+  Map context,
+  IndexingServiceClient indexingServiceClient
+  )
+  {
+this.toolbox = toolbox;
+this.taskId = taskId;
+this.groupId = groupId;
+this.ingestionSchema = ingestionSchema;
+this.context = context;
+this.baseFirehoseFactory = (FiniteFirehoseFactory) 
ingestionSchema.getIOConfig().getFirehoseFactory();
+this.maxNumTasks = ingestionSchema.getTuningConfig().getMaxNumSubTasks();
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+  }
+
+  @Override
+  public TaskState run() throws Exception
+  {
+final Iterator subTaskSpecIterator = 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215030
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.apache.commons.io.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
 
 Review comment:
   Those are main differences, and one another thing is that 
`ParallelIndexSubTask` doesn't collect HLL which is required for perfect 
rollup. The `IndexTask` would be deprecated once parallel indexing supports 
perfect rollup and becomes stable, so I think it's easier to maintain both 
tasks while developing, and delete the deprecated one later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203214983
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSupervisorTask.java
 ##
 @@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.common.Counters;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.stats.RowIngestionMetersFactory;
+import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
+import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
+import io.druid.indexing.common.task.ParallelIndexTaskRunner.SubTaskSpecStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+
+/**
+ * ParallelIndexSupervisorTask is capable of running multiple subTasks for 
parallel indexing. This is
+ * applicable if the input {@link FiniteFirehoseFactory} is splittable. While 
this task is running, it can submit
+ * multiple child tasks to overlords. This task succeeds only when all its 
child tasks succeed; otherwise it fails.
+ *
+ * @see ParallelIndexTaskRunner
+ */
+public class ParallelIndexSupervisorTask extends AbstractTask implements 
ChatHandler
+{
+  static final String TYPE = "index_parallel";
+
+  private static final Logger log = new 
Logger(ParallelIndexSupervisorTask.class);
+
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+  private final RowIngestionMetersFactory rowIngestionMetersFactory;
+
+  private final Counters counters = new Counters();
+
+  private volatile ParallelIndexTaskRunner runner;
+
+  // toolbox is initlized when run() is called, and can be used for processing 
HTTP endpoint requests.
+  private volatile TaskToolbox toolbox;
+
+  @JsonCreator
+  public ParallelIndexSupervisorTask(
+  @JsonProperty("id") String id,
+  @JsonProperty("resource") TaskResource taskResource,
+  @JsonProperty("spec") ParallelIndexIngestionSpec 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203169187
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-17 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203167880
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
 
 Review comment:
   I think this should be a separate issue. This code already exists in master.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-16 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r202896825
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
+while 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892300
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java
 ##
 @@ -465,6 +467,23 @@ public RemoteTaskRunnerConfig getConfig()
 return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), 
runningTasks.values(), completeTasks.values()));
   }
 
+  @Nullable
+  @Override
+  public RunnerTaskState getRunnerTaskState(String taskId)
 
 Review comment:
   What is the benefit of using Optional instead of returning null here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892018
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
 
 Review comment:
   

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891941
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,624 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner, ChatHandler
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892027
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java
 ##
 @@ -0,0 +1,502 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.client.indexing.TaskStatusResponse;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for submitting tasks, monitoring task statuses, resubmitting 
failed tasks, and returning the final task
+ * status.
+ */
+public class TaskMonitor
+{
+  private static final Logger log = new Logger(TaskMonitor.class);
+
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+
+  /**
+   * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
+   * read in {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and updated in {@link #submit}
+   * and {@link #retry}. This can also be read by calling {@link 
#getRunningTaskMonitorEntory},
+   * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}.
+   */
+  private final ConcurrentMap runningTasks = new 
ConcurrentHashMap<>();
+
+  /**
+   * A map of subTaskSpecId to {@link TaskHistory}. This map stores the 
history of complete {@link SubTaskSpec}s
+   * whether their final state is succeeded or failed. This is updated in 
{@link MonitorEntry#setLastStatus} which is
+   * called by the {@link java.util.concurrent.Callable} executed by {@link 
#taskStatusChecker} and can be
+   * read by outside of this class.
+   */
+  private final ConcurrentMap> taskHistories = new 
ConcurrentHashMap<>();
+
+  // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks
+  private final Object taskCountLock = new Object();
+
+  // overlord client
+  private final IndexingServiceClient indexingServiceClient;
+  private final int maxRetry;
+  private final int expectedNumSucceededTasks;
+
+  private int numRunningTasks;
+  private int numSucceededTasks;
+  private int numFailedTasks;
+
+  private volatile boolean running = false;
+
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
expectedNumSucceededTasks)
+  {
+this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
+this.maxRetry = maxRetry;
+this.expectedNumSucceededTasks = expectedNumSucceededTasks;
+
+log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", 
expectedNumSucceededTasks);
+  }
+
+  public void start(long taskStatusCheckingPeriod)
+  {
+running = true;
+log.info("Starting taskMonitor");
+// NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
+// That listener should be able to send the events reported to TaskRunner 
to this TaskMonitor.
+taskStatusChecker.scheduleAtFixedRate(
+() -> {
+  try {
+final Iterator> iterator = 
runningTasks.entrySet().iterator();
+while 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891972
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,624 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner, ChatHandler
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891892
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import io.druid.indexer.TaskState;
+import io.druid.indexing.common.TaskToolbox;
+
+/**
+ * ParallelIndexTaskRunner is the actual task runner of {@link 
ParallelIndexSupervisorTask}. There is currently a single
+ * implementation, i.e. {@link SinglePhaseParallelIndexTaskRunner} which 
supports only best-effort roll-up. We can add
+ * more implementations in the future.
+ */
+public interface ParallelIndexTaskRunner
+{
+  TaskState run(TaskToolbox toolbox) throws Exception;
 
 Review comment:
   So, `ParallelIndexTaskRunner` is the one actually processing the task. It 
can have multiple implementations (maybe two) based on the distributed indexing 
algorithm described in https://github.com/apache/incubator-druid/issues/5543.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891935
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,624 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner, ChatHandler
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891918
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/PushedSegmentsReport.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import io.druid.timeline.DataSegment;
+
+import java.util.List;
+
+public class PushedSegmentsReport
 
 Review comment:
   Just the native parallel batch one. Added javadoc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891941
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,624 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputSplit;
+import io.druid.indexer.TaskState;
+import io.druid.indexer.TaskStatusPlus;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status;
+import io.druid.indexing.common.task.TaskMonitor.MonitorEntry;
+import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent;
+import io.druid.indexing.common.task.TaskMonitor.TaskHistory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.segment.realtime.firehose.ChatHandlers;
+import io.druid.server.security.Action;
+import io.druid.server.security.AuthorizerMapper;
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An implementation of {@link ParallelIndexTaskRunner} to support best-effort 
roll-up. This runner can submit and
+ * monitor multiple {@link ParallelIndexSubTask}s.
+ *
+ * As its name indicates, distributed indexing is done in a single phase, 
i.e., without shuffling intermediate data. As
+ * a result, this task can't be used for perfect rollup.
+ */
+public class SinglePhaseParallelIndexTaskRunner implements 
ParallelIndexTaskRunner, ChatHandler
+{
+  private static final Logger log = new 
Logger(SinglePhaseParallelIndexTaskRunner.class);
+
+  private final String taskId;
+  private final String groupId;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final Map context;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final int maxNumTasks;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+
+  private final BlockingQueue> 
taskCompleteEvents =
+  new LinkedBlockingDeque<>();
+
+  // 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891798
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
 
 Review comment:
   That would require a lot of changes in `GranularitySpec` and its all call 
sites. It should be done in a separate PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891847
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891822
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891862
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskClient.java
 ##
 @@ -0,0 +1,80 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.indexing.common.IndexTaskClient;
+import io.druid.indexing.common.TaskInfoProvider;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ParallelIndexTaskClient extends IndexTaskClient
+{
+  private final String subtaskId;
+
+  public ParallelIndexTaskClient(
 
 Review comment:
   Changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891892
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskRunner.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.common.task;
+
+import io.druid.indexer.TaskState;
+import io.druid.indexing.common.TaskToolbox;
+
+/**
+ * ParallelIndexTaskRunner is the actual task runner of {@link 
ParallelIndexSupervisorTask}. There is currently a single
+ * implementation, i.e. {@link SinglePhaseParallelIndexTaskRunner} which 
supports only best-effort roll-up. We can add
+ * more implementations in the future.
+ */
+public interface ParallelIndexTaskRunner
+{
+  TaskState run(TaskToolbox toolbox) throws Exception;
 
 Review comment:
   So, `ParallelIndexTaskRunner` is the one actually processing the task. It 
can have multiple implementations based on the distributed indexing algorithm 
described in https://github.com/apache/incubator-druid/issues/5543.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891785
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
 
 Review comment:
   Good catch! Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891803
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891856
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSupervisorTask.java
 ##
 @@ -0,0 +1,322 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.FiniteFirehoseFactory;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.stats.RowIngestionMetersFactory;
+import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
+import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.realtime.firehose.ChatHandler;
+import io.druid.segment.realtime.firehose.ChatHandlerProvider;
+import io.druid.server.security.AuthorizerMapper;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+
+/**
+ * ParallelIndexSupervisorTask is capable of running multiple subTasks for 
parallel indexing. This is
+ * applicable if the input {@link FiniteFirehoseFactory} is splittable. While 
this task is running, it can submit
+ * multiple child tasks to overlords. This task succeeds only when all its 
child tasks succeed; otherwise it fails.
+ *
+ * @see ParallelIndexTaskRunner
+ */
+public class ParallelIndexSupervisorTask extends AbstractTask implements 
ChatHandler
+{
+  static final String TYPE = "index_parallel";
+
+  private static final Logger log = new 
Logger(ParallelIndexSupervisorTask.class);
+
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final FiniteFirehoseFactory baseFirehoseFactory;
+  private final IndexingServiceClient indexingServiceClient;
+  private final ChatHandlerProvider chatHandlerProvider;
+  private final AuthorizerMapper authorizerMapper;
+  private final RowIngestionMetersFactory rowIngestionMetersFactory;
+
+  private ParallelIndexTaskRunner runner;
+
+  @JsonCreator
+  public ParallelIndexSupervisorTask(
+  @JsonProperty("id") String id,
+  @JsonProperty("resource") TaskResource taskResource,
+  @JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema,
+  @JsonProperty("context") Map context,
+  @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, // 
null in overlords
+  @JacksonInject @Nullable ChatHandlerProvider chatHandlerProvider, // 
null in overlords
+  @JacksonInject AuthorizerMapper authorizerMapper,
+  @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+  )
+  {
+super(
+getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+null,
+taskResource,
+ingestionSchema.getDataSchema().getDataSource(),
+context
+);
+
+this.ingestionSchema = ingestionSchema;
+
+final FirehoseFactory firehoseFactory = 
ingestionSchema.getIOConfig().getFirehoseFactory();
+if (!(firehoseFactory instanceof FiniteFirehoseFactory)) {
+  throw new IAE("[%s] should implement FiniteFirehoseFactory", 
firehoseFactory.getClass().getSimpleName());
+}
+
+this.baseFirehoseFactory = (FiniteFirehoseFactory) firehoseFactory;
+this.indexingServiceClient = indexingServiceClient;
+this.chatHandlerProvider = chatHandlerProvider;
+this.authorizerMapper = authorizerMapper;
+this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+
+if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() > 0) {
+ 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891827
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java
 ##
 @@ -0,0 +1,451 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.druid.client.indexing.IndexingServiceClient;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
+import io.druid.indexer.TaskStatus;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskLockType;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentAllocateAction;
+import io.druid.indexing.common.actions.SurrogateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.common.parsers.ParseException;
+import io.druid.query.DruidMetrics;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentAllocator;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.timeline.DataSegment;
+import org.codehaus.plexus.util.FileUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link 
IndexTask}, but this task
+ * generates and pushes segments, and reports them to the {@link 
ParallelIndexSupervisorTask} instead of
+ * publishing on its own.
+ */
+public class ParallelIndexSubTask extends AbstractTask
+{
+  static final String TYPE = "index_sub";
+
+  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+  private final IndexingServiceClient indexingServiceClient;
+  private final IndexTaskClientFactory 
taskClientFactory;
+
+  @JsonCreator
+  public ParallelIndexSubTask(
+  // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
+  @JsonProperty("id") @Nullable final String id,
+  

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891749
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.druid.indexing.common.Counters;
+import io.druid.indexing.common.task.Task;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * This action is to find a proper {@link 
io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of
+ * the next shard number per {@link Interval} in {@link Counters}. The next 
shard number is incremented by 1 whenever a
+ * new {@link SegmentIdentifier} is allocated.
+ */
+public class CountingSegmentAllocateAction implements 
TaskAction
+{
+  private final String dataSource;
+  private final DateTime timestamp;
+  private final GranularitySpec granularitySpec;
+  @JsonDeserialize(keyUsing = IntervalDeserializer.class)
+  private final Map versions;
+
+  private final SortedSet bucketIntervals;
+
+  @JsonCreator
+  public CountingSegmentAllocateAction(
+  @JsonProperty("dataSource") String dataSource,
+  @JsonProperty("timestamp") DateTime timestamp,
+  @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
+  @JsonProperty("versions") Map versions
+  )
+  {
+this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
+this.granularitySpec = Preconditions.checkNotNull(granularitySpec, 
"granularitySpec");
+this.versions = Preconditions.checkNotNull(versions, "versions");
+
+this.bucketIntervals = 
Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), 
"bucketIntervals");
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+return dataSource;
+  }
+
+  @JsonProperty
+  public DateTime getTimestamp()
+  {
+return timestamp;
+  }
+
+  @JsonProperty
+  public GranularitySpec getGranularitySpec()
+  {
+return granularitySpec;
+  }
+
+  @JsonProperty
+  public Map getVersions()
+  {
+return versions;
+  }
+
+  @Override
+  public TypeReference getReturnTypeReference()
+  {
+return new TypeReference()
+{
+};
+  }
+
+  @Override
+  public SegmentIdentifier perform(Task task, TaskActionToolbox toolbox)
+  {
+Optional maybeInterval = 
granularitySpec.bucketInterval(timestamp);
+if (!maybeInterval.isPresent()) {
+  throw new ISE("Could not find interval for timestamp [%s]", timestamp);
+}
+
+final Interval interval = maybeInterval.get();
+if (!bucketIntervals.contains(interval)) {
+  throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", 
interval, granularitySpec);
+}
+
+final Counters counters = toolbox.getCounters();
 
 Review comment:
   Oh, thanks for finding this! `Counters` is expected to be created per task, 
not a singleton. Will fix this soon.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891665
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891629
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/Counters.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.google.common.util.concurrent.AtomicDouble;
 
 Review comment:
   Yes, only the google common one is available.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891657
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if 

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891707
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.druid.indexing.common.Counters;
+import io.druid.indexing.common.task.Task;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Intervals;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * This action is to find a proper {@link 
io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of
+ * the next shard number per {@link Interval} in {@link Counters}. The next 
shard number is incremented by 1 whenever a
+ * new {@link SegmentIdentifier} is allocated.
+ */
+public class CountingSegmentAllocateAction implements 
TaskAction
+{
+  private final String dataSource;
+  private final DateTime timestamp;
+  private final GranularitySpec granularitySpec;
+  @JsonDeserialize(keyUsing = IntervalDeserializer.class)
+  private final Map versions;
+
+  private final SortedSet bucketIntervals;
+
+  @JsonCreator
+  public CountingSegmentAllocateAction(
+  @JsonProperty("dataSource") String dataSource,
+  @JsonProperty("timestamp") DateTime timestamp,
+  @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
+  @JsonProperty("versions") Map versions
+  )
+  {
+this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
+this.granularitySpec = Preconditions.checkNotNull(granularitySpec, 
"granularitySpec");
+this.versions = Preconditions.checkNotNull(versions, "versions");
+
+this.bucketIntervals = 
Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), 
"bucketIntervals");
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+return dataSource;
+  }
+
+  @JsonProperty
+  public DateTime getTimestamp()
+  {
+return timestamp;
+  }
+
+  @JsonProperty
+  public GranularitySpec getGranularitySpec()
+  {
+return granularitySpec;
+  }
+
+  @JsonProperty
+  public Map getVersions()
+  {
+return versions;
+  }
+
+  @Override
+  public TypeReference getReturnTypeReference()
+  {
+return new TypeReference()
 
 Review comment:
   Hmm, what is the benefit of making this to a static final variable here? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891619
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java
 ##
 @@ -50,5 +53,8 @@
   @Override
   public void configure(Binder binder)
   {
+binder.bind(new 
TypeLiteral>(){})
 
 Review comment:
   Good catch. Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891640
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
 
 Review comment:
   `IndexTaskClient` is theoretically can be used by any nodes, but currently 
is being used by both overlords and middleManagers (more precisely peons). In 
Kafka indexing service, the supervisor (which is run on an overlord) is using 
this to communicate with kafkaIndexTasks. 
   
   In native batch indexing, the supervisorTask (which is run on a peon) is 
using this to communicate with subTasks. I think there should be no significant 
issue here because each subTasks call a REST api of the supervisorTask only one 
time before they are finished.
   
   > If that is true what are the considerations for number of http server 
threads the overlord needs compared to settings here for large clusters?
   
   Maybe it's problem. I'm not sure about this. @gianm any thoughts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891627
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/appenderator/SegmentAllocateActionGenerator.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.indexing.appenderator;
+
+import io.druid.data.input.InputRow;
+import io.druid.indexing.common.actions.TaskAction;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+
+public interface SegmentAllocateActionGenerator
+{
+  TaskAction generate(
 
 Review comment:
   Added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891613
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -362,317 +288,70 @@ public boolean setEndOffsets(
 log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, 
endOffsets, finalize);
 
 try {
-  final FullResponseHolder response = submitRequest(
+  final FullResponseHolder response = submitJsonRequest(
   id,
   HttpMethod.POST,
   "offsets/end",
   StringUtils.format("finish=%s", finalize),
-  jsonMapper.writeValueAsBytes(endOffsets),
+  serialize(endOffsets),
   true
   );
-  return response.getStatus().getCode() / 100 == 2;
+  return isSuccess(response);
 }
 catch (NoTaskLocationException e) {
   return false;
 }
 catch (IOException e) {
-  throw new RuntimeException(e);
+  throw Throwables.propagate(e);
 }
   }
 
   public ListenableFuture stopAsync(final String id, final boolean 
publish)
   {
-return executorService.submit(
-new Callable()
-{
-  @Override
-  public Boolean call()
-  {
-return stop(id, publish);
-  }
-}
-);
+return doAsync(() -> stop(id, publish));
   }
 
   public ListenableFuture resumeAsync(final String id)
   {
-return executorService.submit(
-new Callable()
-{
-  @Override
-  public Boolean call()
-  {
-return resume(id);
-  }
-}
-);
+return doAsync(() -> resume(id));
 
 Review comment:
   Yes, we can use CompletableFuture instead. Probably related to 
https://github.com/apache/incubator-druid/issues/5415. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891602
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -362,317 +288,70 @@ public boolean setEndOffsets(
 log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, 
endOffsets, finalize);
 
 try {
-  final FullResponseHolder response = submitRequest(
+  final FullResponseHolder response = submitJsonRequest(
   id,
   HttpMethod.POST,
   "offsets/end",
   StringUtils.format("finish=%s", finalize),
-  jsonMapper.writeValueAsBytes(endOffsets),
+  serialize(endOffsets),
   true
   );
-  return response.getStatus().getCode() / 100 == 2;
+  return isSuccess(response);
 }
 catch (NoTaskLocationException e) {
   return false;
 }
 catch (IOException e) {
-  throw new RuntimeException(e);
+  throw Throwables.propagate(e);
 
 Review comment:
   Good point. Changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-11 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891581
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##
 @@ -180,18 +113,17 @@ public boolean resume(final String id)
 
   if (response.getStatus().equals(HttpResponseStatus.OK)) {
 log.info("Task [%s] paused successfully", id);
-return jsonMapper.readValue(response.getContent(), new 
TypeReference>()
+return deserialize(response.getContent(), new 
TypeReference>()
 
 Review comment:
   What do you mean? The returned object is a map and used by `KafkaSupervisor`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org



[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle

2018-07-09 Thread GitBox
jihoonson commented on a change in pull request #5492: Native parallel batch 
indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201195250
 
 

 ##
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java
 ##
 @@ -0,0 +1,381 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexer.TaskStatus;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.IOE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.segment.realtime.firehose.ChatHandlerResource;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
+ * data and send an HTTP request.
+ */
+public abstract class IndexTaskClient implements AutoCloseable
+{
+  public static class NoTaskLocationException extends RuntimeException
+  {
+public NoTaskLocationException(String message)
+{
+  super(message);
+}
+  }
+
+  public static class TaskNotRunnableException extends RuntimeException
+  {
+public TaskNotRunnableException(String message)
+{
+  super(message);
+}
+  }
+
+  public static final int MAX_RETRY_WAIT_SECONDS = 10;
+
+  private static final EmittingLogger log = new 
EmittingLogger(IndexTaskClient.class);
+  private static final String BASE_PATH = "/druid/worker/v1/chat";
+  private static final int MIN_RETRY_WAIT_SECONDS = 2;
+  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
+
+  private final HttpClient httpClient;
+  private final ObjectMapper objectMapper;
+  private final TaskInfoProvider taskInfoProvider;
+  private final Duration httpTimeout;
+  private final RetryPolicyFactory retryPolicyFactory;
+  private final ListeningExecutorService executorService;
+
+  public IndexTaskClient(
+  HttpClient httpClient,
+  ObjectMapper objectMapper,
+  TaskInfoProvider taskInfoProvider,
+  Duration httpTimeout,
+  String callerId,
+  int numThreads,
+  long numRetries
+  )
+  {
+this.httpClient = httpClient;
+this.objectMapper = objectMapper;
+this.taskInfoProvider = taskInfoProvider;
+this.httpTimeout = httpTimeout;
+this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries);
+this.executorService = MoreExecutors.listeningDecorator(
+Execs.multiThreaded(
+numThreads,
+StringUtils.format(
+"IndexTaskClient-%s-%%d",
+callerId
+)
+)
+);
+  }
+
+  private static RetryPolicyFactory initializeRetryPolicyFactory(long 
numRetries)
+  {
+// Retries [numRetries] times before giving up; this should be set long 
enough to handle any temporary
+// unresponsiveness such as network issues, if