[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203823447 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask Review comment: We have a rough plan for that. It's 'Two phase parallel indexing with shuffle' in https://github.com/apache/incubator-druid/issues/5543. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215000 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final TaskToolbox toolbox; + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + // subTaskId -> report + private final ConcurrentMap segmentsMap = new ConcurrentHashMap<>(); + + private volatile boolean stopped; + private volatile TaskMonitor taskMonitor; + + private int nextSpecId = 0; + + SinglePhaseParallelIndexTaskRunner( + TaskToolbox toolbox, + String taskId, + String groupId, + ParallelIndexIngestionSpec ingestionSchema, + Map context, + IndexingServiceClient indexingServiceClient + ) + { +this.toolbox = toolbox; +this.taskId = taskId; +this.groupId = groupId; +this.ingestionSchema = ingestionSchema; +this.context = context; +this.baseFirehoseFactory = (FiniteFirehoseFactory) ingestionSchema.getIOConfig().getFirehoseFactory(); +this.maxNumTasks = ingestionSchema.getTuningConfig().getMaxNumSubTasks(); +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); + } + + @Override + public TaskState run() throws Exception + { +final Iterator subTaskSpecIterator =
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203822871 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSupervisorTask.java ## @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import io.druid.indexing.common.task.ParallelIndexTaskRunner.SubTaskSpecStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.stream.Collectors; + +/** + * ParallelIndexSupervisorTask is capable of running multiple subTasks for parallel indexing. This is + * applicable if the input {@link FiniteFirehoseFactory} is splittable. While this task is running, it can submit + * multiple child tasks to overlords. This task succeeds only when all its child tasks succeed; otherwise it fails. + * + * @see ParallelIndexTaskRunner + */ +public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHandler +{ + static final String TYPE = "index_parallel"; + + private static final Logger log = new Logger(ParallelIndexSupervisorTask.class); + + private final ParallelIndexIngestionSpec ingestionSchema; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + private final RowIngestionMetersFactory rowIngestionMetersFactory; + + private final Counters counters = new Counters(); + + private volatile ParallelIndexTaskRunner runner; + + // toolbox is initlized when run() is called, and can be used for processing HTTP endpoint requests. + private volatile TaskToolbox toolbox; + + @JsonCreator + public ParallelIndexSupervisorTask( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("spec") ParallelIndexIngestionSpec
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215047 ## File path: server/src/main/java/io/druid/client/indexing/ClientQuery.java ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.indexing; + +public interface ClientQuery Review comment: Nice finding! Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215022 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java ## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.client.indexing.TaskStatusResponse; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Responsible for submitting tasks, monitoring task statuses, resubmitting failed tasks, and returning the final task + * status. + */ +public class TaskMonitor +{ + private static final Logger log = new Logger(TaskMonitor.class); + + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + + /** + * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is + * read in {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and updated in {@link #submit} + * and {@link #retry}. This can also be read by calling {@link #getRunningTaskMonitorEntory}, + * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}. + */ + private final ConcurrentMap runningTasks = new ConcurrentHashMap<>(); + + /** + * A map of subTaskSpecId to {@link TaskHistory}. This map stores the history of complete {@link SubTaskSpec}s + * whether their final state is succeeded or failed. This is updated in {@link MonitorEntry#setLastStatus} which is + * called by the {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and can be + * read by outside of this class. + */ + private final ConcurrentMap> taskHistories = new ConcurrentHashMap<>(); + + // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks + private final Object taskCountLock = new Object(); + + // lock for updating running state + private final Object startStopLock = new Object(); + + // overlord client + private final IndexingServiceClient indexingServiceClient; + private final int maxRetry; + private final int expectedNumSucceededTasks; + + private int numRunningTasks; + private int numSucceededTasks; + private int numFailedTasks; + + private boolean running = false; + + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + { +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); +this.maxRetry = maxRetry; +this.expectedNumSucceededTasks = expectedNumSucceededTasks; + +log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + } + + public void start(long taskStatusCheckingPeriod) + { +synchronized (startStopLock) { + running = true; + log.info("Starting taskMonitor"); + // NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner. + // That listener should be able to send the events reported to TaskRunner to this TaskMonitor. + taskStatusChecker.scheduleAtFixedRate( + () -> { +try { + final Iterator> iterator = runningTasks.entrySet().iterator(); +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215000 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final TaskToolbox toolbox; + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + // subTaskId -> report + private final ConcurrentMap segmentsMap = new ConcurrentHashMap<>(); + + private volatile boolean stopped; + private volatile TaskMonitor taskMonitor; + + private int nextSpecId = 0; + + SinglePhaseParallelIndexTaskRunner( + TaskToolbox toolbox, + String taskId, + String groupId, + ParallelIndexIngestionSpec ingestionSchema, + Map context, + IndexingServiceClient indexingServiceClient + ) + { +this.toolbox = toolbox; +this.taskId = taskId; +this.groupId = groupId; +this.ingestionSchema = ingestionSchema; +this.context = context; +this.baseFirehoseFactory = (FiniteFirehoseFactory) ingestionSchema.getIOConfig().getFirehoseFactory(); +this.maxNumTasks = ingestionSchema.getTuningConfig().getMaxNumSubTasks(); +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); + } + + @Override + public TaskState run() throws Exception + { +final Iterator subTaskSpecIterator =
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203214991 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final TaskToolbox toolbox; + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + // subTaskId -> report + private final ConcurrentMap segmentsMap = new ConcurrentHashMap<>(); + + private volatile boolean stopped; + private volatile TaskMonitor taskMonitor; + + private int nextSpecId = 0; + + SinglePhaseParallelIndexTaskRunner( + TaskToolbox toolbox, + String taskId, + String groupId, + ParallelIndexIngestionSpec ingestionSchema, + Map context, + IndexingServiceClient indexingServiceClient + ) + { +this.toolbox = toolbox; +this.taskId = taskId; +this.groupId = groupId; +this.ingestionSchema = ingestionSchema; +this.context = context; +this.baseFirehoseFactory = (FiniteFirehoseFactory) ingestionSchema.getIOConfig().getFirehoseFactory(); +this.maxNumTasks = ingestionSchema.getTuningConfig().getMaxNumSubTasks(); +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); + } + + @Override + public TaskState run() throws Exception + { +final Iterator subTaskSpecIterator =
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203215030 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask Review comment: Those are main differences, and one another thing is that `ParallelIndexSubTask` doesn't collect HLL which is required for perfect rollup. The `IndexTask` would be deprecated once parallel indexing supports perfect rollup and becomes stable, so I think it's easier to maintain both tasks while developing, and delete the deprecated one later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203214983 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSupervisorTask.java ## @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import io.druid.indexing.common.task.ParallelIndexTaskRunner.SubTaskSpecStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.stream.Collectors; + +/** + * ParallelIndexSupervisorTask is capable of running multiple subTasks for parallel indexing. This is + * applicable if the input {@link FiniteFirehoseFactory} is splittable. While this task is running, it can submit + * multiple child tasks to overlords. This task succeeds only when all its child tasks succeed; otherwise it fails. + * + * @see ParallelIndexTaskRunner + */ +public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHandler +{ + static final String TYPE = "index_parallel"; + + private static final Logger log = new Logger(ParallelIndexSupervisorTask.class); + + private final ParallelIndexIngestionSpec ingestionSchema; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + private final RowIngestionMetersFactory rowIngestionMetersFactory; + + private final Counters counters = new Counters(); + + private volatile ParallelIndexTaskRunner runner; + + // toolbox is initlized when run() is called, and can be used for processing HTTP endpoint requests. + private volatile TaskToolbox toolbox; + + @JsonCreator + public ParallelIndexSupervisorTask( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("spec") ParallelIndexIngestionSpec
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203169187 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize + * data and send an HTTP request. + */ +public abstract class IndexTaskClient implements AutoCloseable +{ + public static class NoTaskLocationException extends RuntimeException + { +public NoTaskLocationException(String message) +{ + super(message); +} + } + + public static class TaskNotRunnableException extends RuntimeException + { +public TaskNotRunnableException(String message) +{ + super(message); +} + } + + public static final int MAX_RETRY_WAIT_SECONDS = 10; + + private static final EmittingLogger log = new EmittingLogger(IndexTaskClient.class); + private static final String BASE_PATH = "/druid/worker/v1/chat"; + private static final int MIN_RETRY_WAIT_SECONDS = 2; + private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final TaskInfoProvider taskInfoProvider; + private final Duration httpTimeout; + private final RetryPolicyFactory retryPolicyFactory; + private final ListeningExecutorService executorService; + + public IndexTaskClient( + HttpClient httpClient, + ObjectMapper objectMapper, + TaskInfoProvider taskInfoProvider, + Duration httpTimeout, + String callerId, + int numThreads, + long numRetries + ) + { +this.httpClient = httpClient; +this.objectMapper = objectMapper; +this.taskInfoProvider = taskInfoProvider; +this.httpTimeout = httpTimeout; +this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries); +this.executorService = MoreExecutors.listeningDecorator( +Execs.multiThreaded( +numThreads, +StringUtils.format( +"IndexTaskClient-%s-%%d", +callerId +) +) +); + } + + private static RetryPolicyFactory initializeRetryPolicyFactory(long numRetries) + { +// Retries [numRetries] times before giving up; this should be set long enough to handle any temporary +// unresponsiveness such as network issues, if
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r203167880 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize Review comment: I think this should be a separate issue. This code already exists in master. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r202896825 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java ## @@ -0,0 +1,502 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.client.indexing.TaskStatusResponse; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Responsible for submitting tasks, monitoring task statuses, resubmitting failed tasks, and returning the final task + * status. + */ +public class TaskMonitor +{ + private static final Logger log = new Logger(TaskMonitor.class); + + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + + /** + * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is + * read in {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and updated in {@link #submit} + * and {@link #retry}. This can also be read by calling {@link #getRunningTaskMonitorEntory}, + * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}. + */ + private final ConcurrentMap runningTasks = new ConcurrentHashMap<>(); + + /** + * A map of subTaskSpecId to {@link TaskHistory}. This map stores the history of complete {@link SubTaskSpec}s + * whether their final state is succeeded or failed. This is updated in {@link MonitorEntry#setLastStatus} which is + * called by the {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and can be + * read by outside of this class. + */ + private final ConcurrentMap> taskHistories = new ConcurrentHashMap<>(); + + // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks + private final Object taskCountLock = new Object(); + + // overlord client + private final IndexingServiceClient indexingServiceClient; + private final int maxRetry; + private final int expectedNumSucceededTasks; + + private int numRunningTasks; + private int numSucceededTasks; + private int numFailedTasks; + + private volatile boolean running = false; + + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + { +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); +this.maxRetry = maxRetry; +this.expectedNumSucceededTasks = expectedNumSucceededTasks; + +log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + } + + public void start(long taskStatusCheckingPeriod) + { +running = true; +log.info("Starting taskMonitor"); +// NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner. +// That listener should be able to send the events reported to TaskRunner to this TaskMonitor. +taskStatusChecker.scheduleAtFixedRate( +() -> { + try { +final Iterator> iterator = runningTasks.entrySet().iterator(); +while
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892300 ## File path: indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java ## @@ -465,6 +467,23 @@ public RemoteTaskRunnerConfig getConfig() return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values())); } + @Nullable + @Override + public RunnerTaskState getRunnerTaskState(String taskId) Review comment: What is the benefit of using Optional instead of returning null here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892018 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java ## @@ -0,0 +1,502 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.client.indexing.TaskStatusResponse; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Responsible for submitting tasks, monitoring task statuses, resubmitting failed tasks, and returning the final task + * status. + */ +public class TaskMonitor +{ + private static final Logger log = new Logger(TaskMonitor.class); + + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + + /** + * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is + * read in {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and updated in {@link #submit} + * and {@link #retry}. This can also be read by calling {@link #getRunningTaskMonitorEntory}, + * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}. + */ + private final ConcurrentMap runningTasks = new ConcurrentHashMap<>(); + + /** + * A map of subTaskSpecId to {@link TaskHistory}. This map stores the history of complete {@link SubTaskSpec}s + * whether their final state is succeeded or failed. This is updated in {@link MonitorEntry#setLastStatus} which is + * called by the {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and can be + * read by outside of this class. + */ + private final ConcurrentMap> taskHistories = new ConcurrentHashMap<>(); + + // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks + private final Object taskCountLock = new Object(); + + // overlord client + private final IndexingServiceClient indexingServiceClient; + private final int maxRetry; + private final int expectedNumSucceededTasks; + + private int numRunningTasks; + private int numSucceededTasks; + private int numFailedTasks; + + private volatile boolean running = false; + + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + { +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); +this.maxRetry = maxRetry; +this.expectedNumSucceededTasks = expectedNumSucceededTasks; + +log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + } + + public void start(long taskStatusCheckingPeriod) + { +running = true; +log.info("Starting taskMonitor"); +// NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner. +// That listener should be able to send the events reported to TaskRunner to this TaskMonitor. +taskStatusChecker.scheduleAtFixedRate( +() -> { + try { +final Iterator> iterator = runningTasks.entrySet().iterator(); Review comment:
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891941 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201892027 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/TaskMonitor.java ## @@ -0,0 +1,502 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.client.indexing.TaskStatusResponse; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Responsible for submitting tasks, monitoring task statuses, resubmitting failed tasks, and returning the final task + * status. + */ +public class TaskMonitor +{ + private static final Logger log = new Logger(TaskMonitor.class); + + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + + /** + * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is + * read in {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and updated in {@link #submit} + * and {@link #retry}. This can also be read by calling {@link #getRunningTaskMonitorEntory}, + * {@link #getRunningTaskIds}, and {@link #getRunningSubTaskSpecs}. + */ + private final ConcurrentMap runningTasks = new ConcurrentHashMap<>(); + + /** + * A map of subTaskSpecId to {@link TaskHistory}. This map stores the history of complete {@link SubTaskSpec}s + * whether their final state is succeeded or failed. This is updated in {@link MonitorEntry#setLastStatus} which is + * called by the {@link java.util.concurrent.Callable} executed by {@link #taskStatusChecker} and can be + * read by outside of this class. + */ + private final ConcurrentMap> taskHistories = new ConcurrentHashMap<>(); + + // lock for updating numRunningTasks, numSucceededTasks, and numFailedTasks + private final Object taskCountLock = new Object(); + + // overlord client + private final IndexingServiceClient indexingServiceClient; + private final int maxRetry; + private final int expectedNumSucceededTasks; + + private int numRunningTasks; + private int numSucceededTasks; + private int numFailedTasks; + + private volatile boolean running = false; + + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + { +this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); +this.maxRetry = maxRetry; +this.expectedNumSucceededTasks = expectedNumSucceededTasks; + +log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + } + + public void start(long taskStatusCheckingPeriod) + { +running = true; +log.info("Starting taskMonitor"); +// NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner. +// That listener should be able to send the events reported to TaskRunner to this TaskMonitor. +taskStatusChecker.scheduleAtFixedRate( +() -> { + try { +final Iterator> iterator = runningTasks.entrySet().iterator(); +while
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891972 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891892 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskRunner.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import io.druid.indexer.TaskState; +import io.druid.indexing.common.TaskToolbox; + +/** + * ParallelIndexTaskRunner is the actual task runner of {@link ParallelIndexSupervisorTask}. There is currently a single + * implementation, i.e. {@link SinglePhaseParallelIndexTaskRunner} which supports only best-effort roll-up. We can add + * more implementations in the future. + */ +public interface ParallelIndexTaskRunner +{ + TaskState run(TaskToolbox toolbox) throws Exception; Review comment: So, `ParallelIndexTaskRunner` is the one actually processing the task. It can have multiple implementations (maybe two) based on the distributed indexing algorithm described in https://github.com/apache/incubator-druid/issues/5543. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891935 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891918 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/PushedSegmentsReport.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.timeline.DataSegment; + +import java.util.List; + +public class PushedSegmentsReport Review comment: Just the native parallel batch one. Added javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891941 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/SinglePhaseParallelIndexTaskRunner.java ## @@ -0,0 +1,624 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputSplit; +import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatusPlus; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.task.ParallelIndexSupervisorTask.Status; +import io.druid.indexing.common.task.TaskMonitor.MonitorEntry; +import io.druid.indexing.common.task.TaskMonitor.SubTaskCompleteEvent; +import io.druid.indexing.common.task.TaskMonitor.TaskHistory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.appenderator.UsedSegmentChecker; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.ChatHandlers; +import io.druid.server.security.Action; +import io.druid.server.security.AuthorizerMapper; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and + * monitor multiple {@link ParallelIndexSubTask}s. + * + * As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As + * a result, this task can't be used for perfect rollup. + */ +public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunner, ChatHandler +{ + private static final Logger log = new Logger(SinglePhaseParallelIndexTaskRunner.class); + + private final String taskId; + private final String groupId; + private final ParallelIndexIngestionSpec ingestionSchema; + private final Map context; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final int maxNumTasks; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + + private final BlockingQueue> taskCompleteEvents = + new LinkedBlockingDeque<>(); + + //
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891798 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; Review comment: That would require a lot of changes in `GranularitySpec` and its all call sites. It should be done in a separate PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891847 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask +{ + static final String TYPE = "index_sub"; + + private static final Logger log = new Logger(ParallelIndexSubTask.class); + + private final int numAttempts; + private final ParallelIndexIngestionSpec ingestionSchema; + private final String supervisorTaskId; + private final IndexingServiceClient indexingServiceClient; + private final IndexTaskClientFactory taskClientFactory; + + @JsonCreator + public ParallelIndexSubTask( + // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask + @JsonProperty("id") @Nullable final String id, +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891822 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask +{ + static final String TYPE = "index_sub"; + + private static final Logger log = new Logger(ParallelIndexSubTask.class); + + private final int numAttempts; + private final ParallelIndexIngestionSpec ingestionSchema; + private final String supervisorTaskId; + private final IndexingServiceClient indexingServiceClient; + private final IndexTaskClientFactory taskClientFactory; + + @JsonCreator + public ParallelIndexSubTask( + // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask + @JsonProperty("id") @Nullable final String id, +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891862 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskClient.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.indexing.common.IndexTaskClient; +import io.druid.indexing.common.TaskInfoProvider; +import io.druid.java.util.common.ISE; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Duration; + +import java.io.IOException; +import java.util.List; + +public class ParallelIndexTaskClient extends IndexTaskClient +{ + private final String subtaskId; + + public ParallelIndexTaskClient( Review comment: Changed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891892 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexTaskRunner.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.task; + +import io.druid.indexer.TaskState; +import io.druid.indexing.common.TaskToolbox; + +/** + * ParallelIndexTaskRunner is the actual task runner of {@link ParallelIndexSupervisorTask}. There is currently a single + * implementation, i.e. {@link SinglePhaseParallelIndexTaskRunner} which supports only best-effort roll-up. We can add + * more implementations in the future. + */ +public interface ParallelIndexTaskRunner +{ + TaskState run(TaskToolbox toolbox) throws Exception; Review comment: So, `ParallelIndexTaskRunner` is the one actually processing the task. It can have multiple implementations based on the distributed indexing algorithm described in https://github.com/apache/incubator-druid/issues/5543. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891785 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; Review comment: Good catch! Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891803 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask +{ + static final String TYPE = "index_sub"; + + private static final Logger log = new Logger(ParallelIndexSubTask.class); + + private final int numAttempts; + private final ParallelIndexIngestionSpec ingestionSchema; + private final String supervisorTaskId; + private final IndexingServiceClient indexingServiceClient; + private final IndexTaskClientFactory taskClientFactory; + + @JsonCreator + public ParallelIndexSubTask( + // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask + @JsonProperty("id") @Nullable final String id, +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891856 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSupervisorTask.java ## @@ -0,0 +1,322 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.FiniteFirehoseFactory; +import io.druid.data.input.FirehoseFactory; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.server.security.AuthorizerMapper; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; + +/** + * ParallelIndexSupervisorTask is capable of running multiple subTasks for parallel indexing. This is + * applicable if the input {@link FiniteFirehoseFactory} is splittable. While this task is running, it can submit + * multiple child tasks to overlords. This task succeeds only when all its child tasks succeed; otherwise it fails. + * + * @see ParallelIndexTaskRunner + */ +public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHandler +{ + static final String TYPE = "index_parallel"; + + private static final Logger log = new Logger(ParallelIndexSupervisorTask.class); + + private final ParallelIndexIngestionSpec ingestionSchema; + private final FiniteFirehoseFactory baseFirehoseFactory; + private final IndexingServiceClient indexingServiceClient; + private final ChatHandlerProvider chatHandlerProvider; + private final AuthorizerMapper authorizerMapper; + private final RowIngestionMetersFactory rowIngestionMetersFactory; + + private ParallelIndexTaskRunner runner; + + @JsonCreator + public ParallelIndexSupervisorTask( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema, + @JsonProperty("context") Map context, + @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, // null in overlords + @JacksonInject @Nullable ChatHandlerProvider chatHandlerProvider, // null in overlords + @JacksonInject AuthorizerMapper authorizerMapper, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory + ) + { +super( +getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()), +null, +taskResource, +ingestionSchema.getDataSchema().getDataSource(), +context +); + +this.ingestionSchema = ingestionSchema; + +final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); +if (!(firehoseFactory instanceof FiniteFirehoseFactory)) { + throw new IAE("[%s] should implement FiniteFirehoseFactory", firehoseFactory.getClass().getSimpleName()); +} + +this.baseFirehoseFactory = (FiniteFirehoseFactory) firehoseFactory; +this.indexingServiceClient = indexingServiceClient; +this.chatHandlerProvider = chatHandlerProvider; +this.authorizerMapper = authorizerMapper; +this.rowIngestionMetersFactory = rowIngestionMetersFactory; + +if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() > 0) { +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891827 ## File path: indexing-service/src/main/java/io/druid/indexing/common/task/ParallelIndexSubTask.java ## @@ -0,0 +1,451 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.indexer.TaskStatus; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.appenderator.CountingActionBasedSegmentAllocator; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskLockType; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SurrogateAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.timeline.DataSegment; +import org.codehaus.plexus.util.FileUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * A worker task of {@link ParallelIndexSupervisorTask}. Similar to {@link IndexTask}, but this task + * generates and pushes segments, and reports them to the {@link ParallelIndexSupervisorTask} instead of + * publishing on its own. + */ +public class ParallelIndexSubTask extends AbstractTask +{ + static final String TYPE = "index_sub"; + + private static final Logger log = new Logger(ParallelIndexSubTask.class); + + private final int numAttempts; + private final ParallelIndexIngestionSpec ingestionSchema; + private final String supervisorTaskId; + private final IndexingServiceClient indexingServiceClient; + private final IndexTaskClientFactory taskClientFactory; + + @JsonCreator + public ParallelIndexSubTask( + // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask + @JsonProperty("id") @Nullable final String id, +
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891749 ## File path: indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.SortedSet; + +/** + * This action is to find a proper {@link io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of + * the next shard number per {@link Interval} in {@link Counters}. The next shard number is incremented by 1 whenever a + * new {@link SegmentIdentifier} is allocated. + */ +public class CountingSegmentAllocateAction implements TaskAction +{ + private final String dataSource; + private final DateTime timestamp; + private final GranularitySpec granularitySpec; + @JsonDeserialize(keyUsing = IntervalDeserializer.class) + private final Map versions; + + private final SortedSet bucketIntervals; + + @JsonCreator + public CountingSegmentAllocateAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestamp") DateTime timestamp, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("versions") Map versions + ) + { +this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); +this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); +this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec"); +this.versions = Preconditions.checkNotNull(versions, "versions"); + +this.bucketIntervals = Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), "bucketIntervals"); + } + + @JsonProperty + public String getDataSource() + { +return dataSource; + } + + @JsonProperty + public DateTime getTimestamp() + { +return timestamp; + } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { +return granularitySpec; + } + + @JsonProperty + public Map getVersions() + { +return versions; + } + + @Override + public TypeReference getReturnTypeReference() + { +return new TypeReference() +{ +}; + } + + @Override + public SegmentIdentifier perform(Task task, TaskActionToolbox toolbox) + { +Optional maybeInterval = granularitySpec.bucketInterval(timestamp); +if (!maybeInterval.isPresent()) { + throw new ISE("Could not find interval for timestamp [%s]", timestamp); +} + +final Interval interval = maybeInterval.get(); +if (!bucketIntervals.contains(interval)) { + throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); +} + +final Counters counters = toolbox.getCounters(); Review comment: Oh, thanks for finding this! `Counters` is expected to be created per task, not a singleton. Will fix this soon. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891665 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize + * data and send an HTTP request. + */ +public abstract class IndexTaskClient implements AutoCloseable +{ + public static class NoTaskLocationException extends RuntimeException + { +public NoTaskLocationException(String message) +{ + super(message); +} + } + + public static class TaskNotRunnableException extends RuntimeException + { +public TaskNotRunnableException(String message) +{ + super(message); +} + } + + public static final int MAX_RETRY_WAIT_SECONDS = 10; + + private static final EmittingLogger log = new EmittingLogger(IndexTaskClient.class); + private static final String BASE_PATH = "/druid/worker/v1/chat"; + private static final int MIN_RETRY_WAIT_SECONDS = 2; + private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final TaskInfoProvider taskInfoProvider; + private final Duration httpTimeout; + private final RetryPolicyFactory retryPolicyFactory; + private final ListeningExecutorService executorService; + + public IndexTaskClient( + HttpClient httpClient, + ObjectMapper objectMapper, + TaskInfoProvider taskInfoProvider, + Duration httpTimeout, + String callerId, + int numThreads, + long numRetries + ) + { +this.httpClient = httpClient; +this.objectMapper = objectMapper; +this.taskInfoProvider = taskInfoProvider; +this.httpTimeout = httpTimeout; +this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries); +this.executorService = MoreExecutors.listeningDecorator( +Execs.multiThreaded( +numThreads, +StringUtils.format( +"IndexTaskClient-%s-%%d", +callerId +) +) +); + } + + private static RetryPolicyFactory initializeRetryPolicyFactory(long numRetries) + { +// Retries [numRetries] times before giving up; this should be set long enough to handle any temporary +// unresponsiveness such as network issues, if
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891629 ## File path: indexing-service/src/main/java/io/druid/indexing/common/Counters.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.google.common.util.concurrent.AtomicDouble; Review comment: Yes, only the google common one is available. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891657 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize + * data and send an HTTP request. + */ +public abstract class IndexTaskClient implements AutoCloseable +{ + public static class NoTaskLocationException extends RuntimeException + { +public NoTaskLocationException(String message) +{ + super(message); +} + } + + public static class TaskNotRunnableException extends RuntimeException + { +public TaskNotRunnableException(String message) +{ + super(message); +} + } + + public static final int MAX_RETRY_WAIT_SECONDS = 10; + + private static final EmittingLogger log = new EmittingLogger(IndexTaskClient.class); + private static final String BASE_PATH = "/druid/worker/v1/chat"; + private static final int MIN_RETRY_WAIT_SECONDS = 2; + private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final TaskInfoProvider taskInfoProvider; + private final Duration httpTimeout; + private final RetryPolicyFactory retryPolicyFactory; + private final ListeningExecutorService executorService; + + public IndexTaskClient( + HttpClient httpClient, + ObjectMapper objectMapper, + TaskInfoProvider taskInfoProvider, + Duration httpTimeout, + String callerId, + int numThreads, + long numRetries + ) + { +this.httpClient = httpClient; +this.objectMapper = objectMapper; +this.taskInfoProvider = taskInfoProvider; +this.httpTimeout = httpTimeout; +this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries); +this.executorService = MoreExecutors.listeningDecorator( +Execs.multiThreaded( +numThreads, +StringUtils.format( +"IndexTaskClient-%s-%%d", +callerId +) +) +); + } + + private static RetryPolicyFactory initializeRetryPolicyFactory(long numRetries) + { +// Retries [numRetries] times before giving up; this should be set long enough to handle any temporary +// unresponsiveness such as network issues, if
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891707 ## File path: indexing-service/src/main/java/io/druid/indexing/common/actions/CountingSegmentAllocateAction.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import io.druid.indexing.common.Counters; +import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.SortedSet; + +/** + * This action is to find a proper {@link io.druid.timeline.partition.ShardSpec} based on counting. This keeps track of + * the next shard number per {@link Interval} in {@link Counters}. The next shard number is incremented by 1 whenever a + * new {@link SegmentIdentifier} is allocated. + */ +public class CountingSegmentAllocateAction implements TaskAction +{ + private final String dataSource; + private final DateTime timestamp; + private final GranularitySpec granularitySpec; + @JsonDeserialize(keyUsing = IntervalDeserializer.class) + private final Map versions; + + private final SortedSet bucketIntervals; + + @JsonCreator + public CountingSegmentAllocateAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestamp") DateTime timestamp, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("versions") Map versions + ) + { +this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); +this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); +this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec"); +this.versions = Preconditions.checkNotNull(versions, "versions"); + +this.bucketIntervals = Preconditions.checkNotNull(granularitySpec.bucketIntervals().orNull(), "bucketIntervals"); + } + + @JsonProperty + public String getDataSource() + { +return dataSource; + } + + @JsonProperty + public DateTime getTimestamp() + { +return timestamp; + } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { +return granularitySpec; + } + + @JsonProperty + public Map getVersions() + { +return versions; + } + + @Override + public TypeReference getReturnTypeReference() + { +return new TypeReference() Review comment: Hmm, what is the benefit of making this to a static final variable here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891619 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java ## @@ -50,5 +53,8 @@ @Override public void configure(Binder binder) { +binder.bind(new TypeLiteral>(){}) Review comment: Good catch. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891640 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize Review comment: `IndexTaskClient` is theoretically can be used by any nodes, but currently is being used by both overlords and middleManagers (more precisely peons). In Kafka indexing service, the supervisor (which is run on an overlord) is using this to communicate with kafkaIndexTasks. In native batch indexing, the supervisorTask (which is run on a peon) is using this to communicate with subTasks. I think there should be no significant issue here because each subTasks call a REST api of the supervisorTask only one time before they are finished. > If that is true what are the considerations for number of http server threads the overlord needs compared to settings here for large clusters? Maybe it's problem. I'm not sure about this. @gianm any thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891627 ## File path: indexing-service/src/main/java/io/druid/indexing/appenderator/SegmentAllocateActionGenerator.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.appenderator; + +import io.druid.data.input.InputRow; +import io.druid.indexing.common.actions.TaskAction; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; + +public interface SegmentAllocateActionGenerator +{ + TaskAction generate( Review comment: Added. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891613 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ## @@ -362,317 +288,70 @@ public boolean setEndOffsets( log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize); try { - final FullResponseHolder response = submitRequest( + final FullResponseHolder response = submitJsonRequest( id, HttpMethod.POST, "offsets/end", StringUtils.format("finish=%s", finalize), - jsonMapper.writeValueAsBytes(endOffsets), + serialize(endOffsets), true ); - return response.getStatus().getCode() / 100 == 2; + return isSuccess(response); } catch (NoTaskLocationException e) { return false; } catch (IOException e) { - throw new RuntimeException(e); + throw Throwables.propagate(e); } } public ListenableFuture stopAsync(final String id, final boolean publish) { -return executorService.submit( -new Callable() -{ - @Override - public Boolean call() - { -return stop(id, publish); - } -} -); +return doAsync(() -> stop(id, publish)); } public ListenableFuture resumeAsync(final String id) { -return executorService.submit( -new Callable() -{ - @Override - public Boolean call() - { -return resume(id); - } -} -); +return doAsync(() -> resume(id)); Review comment: Yes, we can use CompletableFuture instead. Probably related to https://github.com/apache/incubator-druid/issues/5415. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891602 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ## @@ -362,317 +288,70 @@ public boolean setEndOffsets( log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize); try { - final FullResponseHolder response = submitRequest( + final FullResponseHolder response = submitJsonRequest( id, HttpMethod.POST, "offsets/end", StringUtils.format("finish=%s", finalize), - jsonMapper.writeValueAsBytes(endOffsets), + serialize(endOffsets), true ); - return response.getStatus().getCode() / 100 == 2; + return isSuccess(response); } catch (NoTaskLocationException e) { return false; } catch (IOException e) { - throw new RuntimeException(e); + throw Throwables.propagate(e); Review comment: Good point. Changed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891581 ## File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java ## @@ -180,18 +113,17 @@ public boolean resume(final String id) if (response.getStatus().equals(HttpResponseStatus.OK)) { log.info("Task [%s] paused successfully", id); -return jsonMapper.readValue(response.getContent(), new TypeReference>() +return deserialize(response.getContent(), new TypeReference>() Review comment: What do you mean? The returned object is a map and used by `KafkaSupervisor`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org
[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201195250 ## File path: indexing-service/src/main/java/io/druid/indexing/common/IndexTaskClient.java ## @@ -0,0 +1,381 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.segment.realtime.firehose.ChatHandlerResource; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +/** + * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize + * data and send an HTTP request. + */ +public abstract class IndexTaskClient implements AutoCloseable +{ + public static class NoTaskLocationException extends RuntimeException + { +public NoTaskLocationException(String message) +{ + super(message); +} + } + + public static class TaskNotRunnableException extends RuntimeException + { +public TaskNotRunnableException(String message) +{ + super(message); +} + } + + public static final int MAX_RETRY_WAIT_SECONDS = 10; + + private static final EmittingLogger log = new EmittingLogger(IndexTaskClient.class); + private static final String BASE_PATH = "/druid/worker/v1/chat"; + private static final int MIN_RETRY_WAIT_SECONDS = 2; + private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + private final TaskInfoProvider taskInfoProvider; + private final Duration httpTimeout; + private final RetryPolicyFactory retryPolicyFactory; + private final ListeningExecutorService executorService; + + public IndexTaskClient( + HttpClient httpClient, + ObjectMapper objectMapper, + TaskInfoProvider taskInfoProvider, + Duration httpTimeout, + String callerId, + int numThreads, + long numRetries + ) + { +this.httpClient = httpClient; +this.objectMapper = objectMapper; +this.taskInfoProvider = taskInfoProvider; +this.httpTimeout = httpTimeout; +this.retryPolicyFactory = initializeRetryPolicyFactory(numRetries); +this.executorService = MoreExecutors.listeningDecorator( +Execs.multiThreaded( +numThreads, +StringUtils.format( +"IndexTaskClient-%s-%%d", +callerId +) +) +); + } + + private static RetryPolicyFactory initializeRetryPolicyFactory(long numRetries) + { +// Retries [numRetries] times before giving up; this should be set long enough to handle any temporary +// unresponsiveness such as network issues, if