Github user HanumathRao commented on a diff in the pull request: https://github.com/apache/drill/pull/1238#discussion_r183576093 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.drill.common.exceptions.UserException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion. + * TODO: look at switching to fork join. + * @param <V> The time value that will be returned when the task is executed. + */ +public abstract class TimedCallable<V> implements Callable<V> { + private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class); + + private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000; + + private volatile long startTime = 0; + private volatile long executionTime = -1; + + private static class FutureMapper<V> implements Function<Future<V>, V> { + int count; + Throwable throwable = null; + + private void setThrowable(Throwable t) { + if (throwable == null) { + throwable = t; + } else { + throwable.addSuppressed(t); + } + } + + @Override + public V apply(Future<V> future) { + Preconditions.checkState(future.isDone()); + if (!future.isCancelled()) { + try { + count++; + return future.get(); + } catch (InterruptedException e) { + // there is no wait as we are getting result from the completed/done future + logger.error("Unexpected exception", e); + throw UserException.internalError(e) + .message("Unexpected exception") + .build(logger); + } catch (ExecutionException e) { + setThrowable(e.getCause()); + } + } else { + setThrowable(new CancellationException()); + } + return null; + } + } + + private static class Statistics<V> implements Consumer<TimedCallable<V>> { + final long start = System.nanoTime(); + final Stopwatch watch = Stopwatch.createStarted(); + long totalExecution = 0; + long maxExecution = 0; + int startedCount = 0; + private int doneCount = 0; + // measure thread creation times + long earliestStart = Long.MAX_VALUE; + long latestStart = 0; + long totalStart = 0; + + @Override + public void accept(TimedCallable<V> task) { + long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start; + if (threadStart >= 0) { + startedCount++; + earliestStart = Math.min(earliestStart, threadStart); + latestStart = Math.max(latestStart, threadStart); + totalStart += threadStart; + long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS); + if (executionTime != -1) { + doneCount++; + totalExecution += executionTime; + maxExecution = Math.max(maxExecution, executionTime); + } else { + logger.info("Task {} started at {} did not finish", task, threadStart); + } + } else { + logger.info("Task {} never commenced execution", task); + } + } + + void info(final String activity, final Logger logger, final List<TimedCallable<V>> tasks, int parallelism) { + tasks.forEach(this); --- End diff -- Is it better to maintain state that tasks.forEach is collected and print the information in the logger if it is collected instead of tasks.forEach(this) for each invocation of info. I know that at this point of time this is only called once, but in future if a call is introduced this can be avoided.
---