[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-25 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307532931
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -463,6 +463,32 @@ public void nodeViewInitialized()
 }
 );
 
+DruidNodeDiscovery druidNodeDiscoveryIndexer = 
druidNodeDiscoveryProvider.getForNodeType(NodeType.INDEXER);
 
 Review comment:
   Instead of repeated code you can change line 441 to
   
   ```
   DruidNodeDiscovery druidNodeDiscovery = 
druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY);
   ```
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-25 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307552680
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime;
+
+import com.google.common.base.Throwables;
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
+
+import java.io.IOException;
+
+/**
+ * Ties the {@link DataSegmentServerAnnouncer} announce/unannounce to the 
lifecycle start and stop.
+ *
+ * Analogous to {@link 
org.apache.druid.server.coordination.SegmentLoadDropHandler} on the Historicals,
+ * but without segment cache management.
+ */
+@ManageLifecycle
+public class CliIndexerDataSegmentServerAnnouncerLifecycleHandler
+{
+  private static final EmittingLogger LOG = new 
EmittingLogger(CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
+
+  private final DataSegmentServerAnnouncer dataSegmentServerAnnouncer;
+
+  // Synchronizes start/stop of this object.
+  private final Object startStopLock = new Object();
+
+  private volatile boolean started = false;
 
 Review comment:
   there is a `LifecycleLock` utility class to manage lifecycle instead of 
`startStopLock` and `started` flag, that can be used here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-25 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307553533
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
 ##
 @@ -197,6 +198,42 @@
 log.info("Created Appenderator for dataSource[%s].", 
schema.getDataSource());
   }
 
+  /**
+   * This constructor allows the caller to provide its own sink timeline and 
segment walker.
+   *
+   * It is used by UnifiedIndexerAppenderatorsManager which allows queries on 
data associated with multiple
+   * Appenderators.
+   */
+  AppenderatorImpl(
 
 Review comment:
   can other constructor use this constructor instead of repeating everything ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-25 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307554304
 
 

 ##
 File path: 
server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
 ##
 @@ -34,12 +34,45 @@
  */
 public class ServerConfig
 {
-
   public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096;
 
+  public ServerConfig(
 
 Review comment:
   I'm surprised that serde works fine with two constructors and without anyone 
marked `@JsonCreator` ... how does ObjectMapper decide which constructor to use 
for deserialization ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307927280
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307846239
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for {@link ForkingTaskRunner} and {@link ThreadingTaskRunner} 
which support task rest
+ * oration.
 
 Review comment:
   nit: can we keep "restoration" as single word


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307850373
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.File;
+import java.util.Map;
+
+public class SingleFileTaskReportFileWriter extends TaskReportFileWriter
+{
+  private static final Logger log = new 
Logger(SingleFileTaskReportFileWriter.class);
+
+  private final File reportsFile;
+
+  public SingleFileTaskReportFileWriter(File reportsFile)
+  {
+this.reportsFile = reportsFile;
+  }
+
+  @Override
+  public void write(String taskId, Map reports)
 
 Review comment:
   it is weird that instance of this class will retain and manage a map of 
taskId -> file (because it inherits TaskReportFileWriter) but those would be 
ignored all time.
   TaskReportWriter probably can be trimmed down a lot more, declared 
abstract/interface and have  separate `MultiFileTaskReportFileWriter` wherever 
`TaskReportWriter` is currently used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307848163
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for {@link ForkingTaskRunner} and {@link ThreadingTaskRunner} 
which support task rest
+ * oration.
+ */
+public abstract class BaseRestorableTaskRunner implements TaskRunner
+{
+  protected static final EmittingLogger LOG = new 
EmittingLogger(BaseRestorableTaskRunner.class);
+  protected static final String TASK_RESTORE_FILENAME = "restore.json";
+
+  protected final CopyOnWriteArrayList> 
listeners = new CopyOnWriteArrayList<>();
+
+  /** Writes must be synchronized. This is only a ConcurrentMap so 
"informational" reads can occur without waiting. */
+  protected final ConcurrentHashMap tasks = new 
ConcurrentHashMap<>();
+  protected final ObjectMapper jsonMapper;
+  protected final TaskConfig taskConfig;
+
+  public BaseRestorableTaskRunner(
+  ObjectMapper jsonMapper,
+  TaskConfig taskConfig
+  )
+  {
+this.jsonMapper = jsonMapper;
+this.taskConfig = taskConfig;
+  }
+
+  @Override
+  public List>> restore()
+  {
+final File restoreFile = getRestoreFile();
+final TaskRestoreInfo taskRestoreInfo;
+if (restoreFile.exists()) {
+  try {
+taskRestoreInfo = jsonMapper.readValue(restoreFile, 
TaskRestoreInfo.class);
+  }
+  catch (Exception e) {
+LOG.error(e, "Failed to read restorable tasks from file[%s]. Skipping 
restore.", restoreFile);
+return ImmutableList.of();
+  }
+} else {
+  return ImmutableList.of();
+}
+
+final List>> retVal = new 
ArrayList<>();
+for (final String taskId : taskRestoreInfo.getRunningTasks()) {
+  try {
+final File taskFile = new File(taskConfig.getTaskDir(taskId), 
"task.json");
+final Task task = jsonMapper.readValue(taskFile, Task.class);
+
+if (!task.getId().equals(taskId)) {
+  throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", 
taskId, task.getId());
+}
+
+if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
+  LOG.info("Restoring task[%s].", task.getId());
+  retVal.add(Pair.of(task, run(task)));
+}
+  }
+  catch (Exception e) {
+LOG.warn(e, "Failed to restore task[%s]. Trying to restore other 
tasks.", taskId);
+  }
+}
+
+LOG.info("Restored %,d tasks.", retVal.size());
+
+return retVal;
+  }
+
+  @Override
+  public void registerListener(TaskRunnerListener listener, Executor executor)
+  {
+for (Pair pair : listeners) {
+  if (pair.lhs.getListenerId().equals(listener.getListenerId())) {
+throw new ISE("Listener [%s] already registered", 
listener.getListenerId());
+  }
+}
+
+final Pair listenerPair = Pair.of(listener, 
executor);
+
+synchronized (tasks) {
+  

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307924654
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteSource;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the Indexer task execution service, which runs 
all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
 
 Review comment:
   is this a choice we are making intentionally to keep log in same place or no 
separation is done because we haven't figured it out yet ?
   if we do end up keeping all logs in same place then it would be great if 
taskId is included in 

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307929711
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307927997
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307926555
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307930508
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-26 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r307929325
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-28 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r308050764
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
setup and execute the Task run() methods.
+ * - A control thread pool, sized to number of worker slots. The control 
threads are responsible for running graceful
+ *   shutdown on the Task objects. Only one shutdown per-task can be running 
at a given time,
+ *   so we allocate one control thread per worker slot.
+ *
+ * Note that separate task logs are not currently supported, all task log 
entries will be written to the Indexer
+ * process log instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final MultipleFileTaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolboxFactory,
+  TaskConfig taskC

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-28 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r308050815
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
setup and execute the Task run() methods.
+ * - A control thread pool, sized to number of worker slots. The control 
threads are responsible for running graceful
+ *   shutdown on the Task objects. Only one shutdown per-task can be running 
at a given time,
+ *   so we allocate one control thread per worker slot.
+ *
+ * Note that separate task logs are not currently supported, all task log 
entries will be written to the Indexer
+ * process log instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final MultipleFileTaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolboxFactory,
+  TaskConfig taskC

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-28 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r308052606
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-28 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r308052606
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-28 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r308053737
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
execute the Task run() methods.
+ * - A control thread pool, sized to worker slots * 2. The control threads 
setup and submit work to the
+ *   task execution pool, and are also responsible for running graceful 
shutdown on the Task objects.
+ *   Only one shutdown per-task can be running at a given time, and there is 
one control thread per task,
+ *   thus the pool has 2 * worker slots.
+ *
+ * Note that separate task logs are not supported, all task log entries will 
be written to the Indexer process log
+ * instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final TaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolbo

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-29 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r308431357
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
setup and execute the Task run() methods.
+ * - A control thread pool, sized to number of worker slots. The control 
threads are responsible for running graceful
+ *   shutdown on the Task objects. Only one shutdown per-task can be running 
at a given time,
+ *   so we allocate one control thread per worker slot.
+ *
+ * Note that separate task logs are not currently supported, all task log 
entries will be written to the Indexer
+ * process log instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final MultipleFileTaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolboxFactory,
+  TaskConfig taskC

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8107: Add CliIndexer process type and initial task runner implementation

2019-07-29 Thread GitBox
himanshug commented on a change in pull request #8107: Add CliIndexer process 
type and initial task runner implementation
URL: https://github.com/apache/incubator-druid/pull/8107#discussion_r308431357
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 ##
 @@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which 
runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to 
setup and execute the Task run() methods.
+ * - A control thread pool, sized to number of worker slots. The control 
threads are responsible for running graceful
+ *   shutdown on the Task objects. Only one shutdown per-task can be running 
at a given time,
+ *   so we allocate one control thread per worker slot.
+ *
+ * Note that separate task logs are not currently supported, all task log 
entries will be written to the Indexer
+ * process log instead.
+ */
+public class ThreadingTaskRunner
+extends 
BaseRestorableTaskRunner
+implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final MultipleFileTaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+  TaskToolboxFactory toolboxFactory,
+  TaskConfig taskC