jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201195250
########## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ########## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize + * data and send an HTTP request. + */ +public abstract class IndexTaskClient implements AutoCloseable +{ + public static class NoTaskLocationException extends RuntimeException + { + public NoTaskLocationException(String message) + { + super(message); + } + } + + public static class TaskNotRunnableException extends RuntimeException + { + public TaskNotRunnableException(String message) + { + super(message); + } + } + + public static final int MAX_RETRY_WAIT_SECONDS = 10; + + private static final EmittingLogger log = new EmittingLogger(IndexTaskClient.class); + private static final String BASE_PATH = "/druid/worker/v1/chat"; + private static final int MIN_RETRY_WAIT_SECONDS = 2; + private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final TaskInfoProvider taskInfoProvider; + private final Duration httpTimeout; + private final RetryPolicyFactory retryPolicyFactory; + private final ListeningExecutorService executorService; + + public IndexTaskClient( + HttpClient httpClient, + ObjectMapper objectMapper, + TaskInfoProvider taskInfoProvider, + Duration httpTimeout, + String callerId, + int numThreads, + long numRetries + ) + { + this.httpClient = httpClient; + this.objectMapper = objectMapper; + this.taskInfoProvider = taskInfoProvider; + this.httpTimeout = httpTimeout; + this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries); + this.executorService = MoreExecutors.listeningDecorator( + Execs.multiThreaded( + numThreads, + StringUtils.format( + "IndexTaskClient-%s-%%d", + callerId + ) + ) + ); + } + + private static RetryPolicyFactory initializeRetryPolicyFactory(long numRetries) + { + // Retries [numRetries] times before giving up; this should be set long enough to handle any temporary + // unresponsiveness such as network issues, if a task is still in the process of starting up, or if the task is in + // the middle of persisting to disk and doesn't respond immediately. + return new RetryPolicyFactory( + new RetryPolicyConfig() + .setMinWait(Period.seconds(MIN_RETRY_WAIT_SECONDS)) + .setMaxWait(Period.seconds(MAX_RETRY_WAIT_SECONDS)) + .setMaxRetryCount(numRetries) + ); + } + + protected HttpClient getHttpClient() + { + return httpClient; + } + + protected RetryPolicy newRetryPolicy() + { + return retryPolicyFactory.makeRetryPolicy(); + } + + protected <T> T deserialize(String content, TypeReference<T> typeReference) throws IOException + { + return objectMapper.readValue(content, typeReference); + } + + protected <T> T deserialize(String content, Class<T> typeReference) throws IOException + { + return objectMapper.readValue(content, typeReference); + } + + protected byte[] serialize(Object value) throws JsonProcessingException + { + return objectMapper.writeValueAsBytes(value); + } + + protected <T> ListenableFuture<T> doAsync(Callable<T> callable) + { + return executorService.submit(callable); + } + + protected boolean isSuccess(FullResponseHolder responseHolder) + { + return responseHolder.getStatus().getCode() / 100 == 2; + } + + @VisibleForTesting + protected void checkConnection(String host, int port) throws IOException + { + new Socket(host, port).close(); + } + + protected FullResponseHolder submitRequestWithEmptyContent( + String taskId, + HttpMethod method, + String pathSuffix, + @Nullable String query, + boolean retry + ) throws IOException, ChannelException, NoTaskLocationException + { + return submitRequest(taskId, null, method, pathSuffix, query, new byte[0], retry); + } + + /** + * To use this method, {@link #objectMapper} should be a jsonMapper. + */ + protected FullResponseHolder submitJsonRequest( + String taskId, + HttpMethod method, + String pathSuffix, + @Nullable String query, + byte[] content, + boolean retry + ) throws IOException, ChannelException, NoTaskLocationException + { + return submitRequest(taskId, MediaType.APPLICATION_JSON, method, pathSuffix, query, content, retry); + } + + /** + * To use this method, {@link #objectMapper} should be a smileMapper. + */ + protected FullResponseHolder submitSmileRequest( + String taskId, + HttpMethod method, + String pathSuffix, + @Nullable String query, + byte[] content, + boolean retry + ) throws IOException, ChannelException, NoTaskLocationException + { + return submitRequest(taskId, SmileMediaTypes.APPLICATION_JACKSON_SMILE, method, pathSuffix, query, content, retry); + } + + /** + * Sends an HTTP request to the task of the specified {@code taskId} and returns a response if it succeeded. + */ + private FullResponseHolder submitRequest( + String taskId, + @Nullable String mediaType, // nullable if content is empty + HttpMethod method, + String pathSuffix, + @Nullable String query, + byte[] content, + boolean retry + ) throws IOException, ChannelException, NoTaskLocationException + { + final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); + + while (true) { + FullResponseHolder response = null; + Request request = null; + TaskLocation location = TaskLocation.unknown(); + String path = StringUtils.format("%s/%s/%s", BASE_PATH, taskId, pathSuffix); + + Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(taskId); + if (!status.isPresent() || !status.get().isRunnable()) { + throw new TaskNotRunnableException(StringUtils.format( + "Aborting request because task [%s] is not runnable", + taskId + )); + } + + String host = location.getHost(); + String scheme = ""; + int port = -1; + + try { + location = taskInfoProvider.getTaskLocation(taskId); + if (location.equals(TaskLocation.unknown())) { + throw new NoTaskLocationException(StringUtils.format("No TaskLocation available for task [%s]", taskId)); + } + + host = location.getHost(); + scheme = location.getTlsPort() >= 0 ? "https" : "http"; + port = location.getTlsPort() >= 0 ? location.getTlsPort() : location.getPort(); Review comment: Hmm, I really didn't write this class. This line was added in https://github.com/apache/incubator-druid/pull/4270 and was originally in `KafkaIndexingTaskClient`. I extracted some common parts from `KafkaIndexTaskClient` to `IndexTaskClient`, so that `KafkaIndexTaskClient` and `ParallelIndexTaskClient`. Please check https://github.com/apache/incubator-druid/pull/5492#discussion_r198956260 as well. I think it's better to fix this class in a separate PR if needed because this PR is already big and I don't want to add unrelated changes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org