This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 53b6467  SeekableStreamIndexTaskRunner: Lazy init of runner. (#7729)
53b6467 is described below

commit 53b6467fc83cd4a78d87b5fd1557c84b2a5b2513
Author: Gian Merlino <g...@imply.io>
AuthorDate: Wed May 22 21:13:57 2019 -0700

    SeekableStreamIndexTaskRunner: Lazy init of runner. (#7729)
    
    The main motivation is that this fixes #7724, by making it so the overlord
    doesn't try to create a task runner and parser when all it really wants to
    do is create a task object and serialize it.
---
 .../druid/indexing/kafka/KafkaIndexTask.java       |  6 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 67 ++++++++++++++++------
 .../druid/indexing/kinesis/KinesisIndexTask.java   |  3 +-
 .../seekablestream/SeekableStreamIndexTask.java    | 32 ++++++-----
 4 files changed, 76 insertions(+), 32 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index df56cce..314f99c 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -131,18 +131,20 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<Integer, Long>
   {
     if (context != null && 
context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null
         && ((boolean) 
context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) {
+      //noinspection unchecked
       return new IncrementalPublishingKafkaIndexTaskRunner(
           this,
-          parser,
+          dataSchema.getParser(),
           authorizerMapper,
           chatHandlerProvider,
           savedParseExceptions,
           rowIngestionMetersFactory
       );
     } else {
+      //noinspection unchecked
       return new LegacyKafkaIndexTaskRunner(
           this,
-          parser,
+          dataSchema.getParser(),
           authorizerMapper,
           chatHandlerProvider,
           savedParseExceptions,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 6b9a203..4c9b39b 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -195,7 +195,7 @@ import static org.apache.druid.query.QueryPlus.wrap;
 public class KafkaIndexTaskTest
 {
   private static final Logger log = new Logger(KafkaIndexTaskTest.class);
-  private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+  private static final ObjectMapper OBJECT_MAPPER = new 
TestUtils().getTestObjectMapper();
   private static final long POLL_RETRY_MS = 100;
 
   private static TestingCluster zkServer;
@@ -204,6 +204,10 @@ public class KafkaIndexTaskTest
   private static ListeningExecutorService taskExec;
   private static int topicPostfix;
 
+  static {
+    new 
KafkaIndexTaskModule().getJacksonModules().forEach(OBJECT_MAPPER::registerModule);
+  }
+
   private final List<Task> runningTasks = new ArrayList<>();
 
   private long handoffConditionTimeout = 0;
@@ -244,7 +248,7 @@ public class KafkaIndexTaskTest
 
   private static final DataSchema DATA_SCHEMA = new DataSchema(
       "test_ds",
-      objectMapper.convertValue(
+      OBJECT_MAPPER.convertValue(
           new StringInputRowParser(
               new JSONParseSpec(
                   new TimestampSpec("timestamp", "iso", null),
@@ -272,7 +276,7 @@ public class KafkaIndexTaskTest
       },
       new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
       null,
-      objectMapper
+      OBJECT_MAPPER
   );
 
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String 
topic)
@@ -730,10 +734,11 @@ public class KafkaIndexTaskTest
     SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
     Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, 
desc6, desc7), publishedDescriptors());
     Assert.assertEquals(
-          new KafkaDataSourceMetadata(
-              new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 
10L, 1, 2L))
-          ),
-          
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
+        new KafkaDataSourceMetadata(
+            new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 
10L, 1, 2L))
+        ),
+        
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+    );
 
     Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, 
desc6, desc7), publishedDescriptors());
     Assert.assertEquals(
@@ -2011,7 +2016,7 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(2, countEvents(task));
     Assert.assertEquals(Status.READING, task.getRunner().getStatus());
 
-    Map<Integer, Long> currentOffsets = objectMapper.readValue(
+    Map<Integer, Long> currentOffsets = OBJECT_MAPPER.readValue(
         task.getRunner().pause().getEntity().toString(),
         new TypeReference<Map<Integer, Long>>()
         {
@@ -2147,7 +2152,7 @@ public class KafkaIndexTaskTest
     final Map<String, Object> context = new HashMap<>();
     context.put(
         SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
-        
objectMapper.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
+        
OBJECT_MAPPER.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
     );
 
     final KafkaIndexTask task = createTask(
@@ -2267,7 +2272,7 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(Status.READING, task.getRunner().getStatus());
 
     //verify the 2 indexed records
-    final QuerySegmentSpec firstInterval = objectMapper.readValue(
+    final QuerySegmentSpec firstInterval = OBJECT_MAPPER.readValue(
         "\"2008/2010\"", QuerySegmentSpec.class
     );
     Iterable<ScanResultValue> scanResultValues = scanData(task, firstInterval);
@@ -2287,7 +2292,7 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(2, countEvents(task));
     Assert.assertEquals(Status.READING, task.getRunner().getStatus());
 
-    final QuerySegmentSpec rollbackedInterval = objectMapper.readValue(
+    final QuerySegmentSpec rollbackedInterval = OBJECT_MAPPER.readValue(
         "\"2010/2012\"", QuerySegmentSpec.class
     );
     scanResultValues = scanData(task, rollbackedInterval);
@@ -2304,7 +2309,7 @@ public class KafkaIndexTaskTest
       kafkaProducer.commitTransaction();
     }
 
-    final QuerySegmentSpec endInterval = objectMapper.readValue(
+    final QuerySegmentSpec endInterval = OBJECT_MAPPER.readValue(
         "\"2008/2049\"", QuerySegmentSpec.class
     );
     Iterable<ScanResultValue> scanResultValues1 = scanData(task, endInterval);
@@ -2388,6 +2393,36 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
   }
 
+  @Test
+  public void testSerde() throws Exception
+  {
+    // This is both a serde test and a regression test for 
https://github.com/apache/incubator-druid/issues/7724.
+
+    final KafkaIndexTask task = createTask(
+        "taskid",
+        DATA_SCHEMA.withTransformSpec(
+            new TransformSpec(
+                null,
+                ImmutableList.of(new ExpressionTransform("beep", "nofunc()", 
ExprMacroTable.nil()))
+            )
+        ),
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence",
+            new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(), 
ImmutableSet.of()),
+            new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of()),
+            ImmutableMap.of(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null
+        )
+    );
+
+    final Task task1 = 
OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsBytes(task), Task.class);
+    Assert.assertEquals(task, task1);
+  }
+
   private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec 
spec)
   {
     ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
@@ -2513,7 +2548,7 @@ public class KafkaIndexTaskTest
       if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
         final TreeMap<Integer, Map<Integer, Long>> checkpoints = new 
TreeMap<>();
         checkpoints.put(0, 
ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
-        final String checkpointsJson = objectMapper
+        final String checkpointsJson = OBJECT_MAPPER
             .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
             .writeValueAsString(checkpoints);
         context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, 
checkpointsJson);
@@ -2530,7 +2565,7 @@ public class KafkaIndexTaskTest
         null,
         null,
         rowIngestionMetersFactory,
-        objectMapper
+        OBJECT_MAPPER
     );
     task.setPollRetryMs(POLL_RETRY_MS);
     return task;
@@ -2544,7 +2579,7 @@ public class KafkaIndexTaskTest
         dataSchema.getAggregators(),
         dataSchema.getGranularitySpec(),
         dataSchema.getTransformSpec(),
-        objectMapper
+        OBJECT_MAPPER
     );
   }
 
@@ -2861,7 +2896,7 @@ public class KafkaIndexTaskTest
 
   private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws 
IOException
   {
-    Map<String, TaskReport> taskReports = objectMapper.readValue(
+    Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
         reportsFile,
         new TypeReference<Map<String, TaskReport>>()
         {
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index ff1847a..f3dfe3b 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -72,9 +72,10 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String>
   @Override
   protected SeekableStreamIndexTaskRunner<String, String> createTaskRunner()
   {
+    //noinspection unchecked
     return new KinesisIndexTaskRunner(
         this,
-        parser,
+        dataSchema.getParser(),
         authorizerMapper,
         chatHandlerProvider,
         savedParseExceptions,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 833cc17..37d472b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -25,7 +25,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
 import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
@@ -57,7 +56,6 @@ import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.utils.CircularBuffer;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Map;
 
 
@@ -67,9 +65,7 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
   private static final EmittingLogger log = new 
EmittingLogger(SeekableStreamIndexTask.class);
 
-  private final SeekableStreamIndexTaskRunner<PartitionIdType, 
SequenceOffsetType> runner;
   protected final DataSchema dataSchema;
-  protected final InputRowParser<ByteBuffer> parser;
   protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
   protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig;
   protected final Optional<ChatHandlerProvider> chatHandlerProvider;
@@ -78,6 +74,12 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   protected final RowIngestionMetersFactory rowIngestionMetersFactory;
   protected final CircularBuffer<Throwable> savedParseExceptions;
 
+  // Lazily initialized, to avoid calling it on the overlord when tasks are 
instantiated.
+  // See https://github.com/apache/incubator-druid/issues/7724 for issues that 
can cause.
+  // By the way, lazily init is synchronized because the runner may be needed 
in multiple threads.
+  private final Object runnerInitLock = new Object();
+  private volatile SeekableStreamIndexTaskRunner<PartitionIdType, 
SequenceOffsetType> runner;
+
   public SeekableStreamIndexTask(
       final String id,
       @Nullable final TaskResource taskResource,
@@ -99,7 +101,6 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
         context
     );
     this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
-    this.parser = Preconditions.checkNotNull((InputRowParser<ByteBuffer>) 
dataSchema.getParser(), "parser");
     this.tuningConfig = Preconditions.checkNotNull(tuningConfig, 
"tuningConfig");
     this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
     this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
@@ -111,7 +112,6 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     this.context = context;
     this.authorizerMapper = authorizerMapper;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
-    this.runner = createTaskRunner();
   }
 
   private static String makeTaskId(String dataSource, String type)
@@ -130,7 +130,6 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     return StringUtils.format("%s_%s", type, dataSource);
   }
 
-
   @Override
   public int getPriority()
   {
@@ -164,7 +163,7 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   @Override
   public TaskStatus run(final TaskToolbox toolbox)
   {
-    return runner.run(toolbox);
+    return getRunner().run(toolbox);
   }
 
   @Override
@@ -177,19 +176,19 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   public void stopGracefully(TaskConfig taskConfig)
   {
     if (taskConfig.isRestoreTasksOnRestart()) {
-      runner.stopGracefully();
+      getRunner().stopGracefully();
     }
   }
 
   @Override
   public <T> QueryRunner<T> getQueryRunner(Query<T> query)
   {
-    if (runner.getAppenderator() == null) {
+    if (getRunner().getAppenderator() == null) {
       // Not yet initialized, no data yet, just return a noop runner.
       return new NoopQueryRunner<>();
     }
 
-    return (queryPlus, responseContext) -> 
queryPlus.run(runner.getAppenderator(), responseContext);
+    return (queryPlus, responseContext) -> 
queryPlus.run(getRunner().getAppenderator(), responseContext);
   }
 
   public Appenderator newAppenderator(FireDepartmentMetrics metrics, 
TaskToolbox toolbox)
@@ -283,13 +282,20 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   @VisibleForTesting
   public Appenderator getAppenderator()
   {
-    return runner.getAppenderator();
+    return getRunner().getAppenderator();
   }
 
   @VisibleForTesting
   public SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> 
getRunner()
   {
+    if (runner == null) {
+      synchronized (runnerInitLock) {
+        if (runner == null) {
+          runner = createTaskRunner();
+        }
+      }
+    }
+
     return runner;
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to