[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-06 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r343357969
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 ##
 @@ -66,6 +67,18 @@ public KafkaRecordSupplier(
 this.consumer = getKafkaConsumer();
   }
 
+  @VisibleForTesting
 
 Review comment:
   can not reference `KafkaRecordSupplier.getKafkaConsumer()` before supertype  
constructor has been called.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-05 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342776259
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 ##
 @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable)
   return null;
 });
   }
+
+  @VisibleForTesting
+  private KafkaConsumer getKafkaConsumer(Map 
consumerConfigs)
+  {
+final Properties props = new Properties();
+addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
+props.putAll(consumerConfigs);
+
+ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+try {
+  
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+  Deserializer keyDeserializerObject = getKafkaDeserializer(props, 
"key.deserializer");
+  Deserializer valueDeserializerObject = getKafkaDeserializer(props, 
"value.deserializer");
+
+  return new KafkaConsumer<>(props, keyDeserializerObject, 
valueDeserializerObject);
+}
+finally {
+  Thread.currentThread().setContextClassLoader(currCtxCl);
+}
+  }
+
+  @VisibleForTesting
+  public KafkaRecordSupplier(
+  Map consumerProperties,
+  ObjectMapper sortingMapper,
+  Map consumerConfigs
+  )
+  {
+this.consumerProperties = consumerProperties;
+this.sortingMapper = sortingMapper;
+this.consumer = getKafkaConsumer(consumerConfigs);
+  }
 
 Review comment:
`getKafkaConsumer ` depends on another private method,  
`addConsumerPropertiesFromConfig`, move both methods to test file?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-05 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342773057
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 ##
 @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable)
   return null;
 });
   }
+
+  @VisibleForTesting
+  private KafkaConsumer getKafkaConsumer(Map 
consumerConfigs)
+  {
+final Properties props = new Properties();
+addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
+props.putAll(consumerConfigs);
+
+ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+try {
+  
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+  Deserializer keyDeserializerObject = getKafkaDeserializer(props, 
"key.deserializer");
+  Deserializer valueDeserializerObject = getKafkaDeserializer(props, 
"value.deserializer");
+
+  return new KafkaConsumer<>(props, keyDeserializerObject, 
valueDeserializerObject);
+}
+finally {
+  Thread.currentThread().setContextClassLoader(currCtxCl);
+}
+  }
+
+  @VisibleForTesting
+  public KafkaRecordSupplier(
+  Map consumerProperties,
+  ObjectMapper sortingMapper,
+  Map consumerConfigs
+  )
+  {
+this.consumerProperties = consumerProperties;
+this.sortingMapper = sortingMapper;
+this.consumer = getKafkaConsumer(consumerConfigs);
+  }
 
 Review comment:
   `getKafkaConsumer()` is a private method. And it depends on other parameters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-05 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342776259
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 ##
 @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable)
   return null;
 });
   }
+
+  @VisibleForTesting
+  private KafkaConsumer getKafkaConsumer(Map 
consumerConfigs)
+  {
+final Properties props = new Properties();
+addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
+props.putAll(consumerConfigs);
+
+ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+try {
+  
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+  Deserializer keyDeserializerObject = getKafkaDeserializer(props, 
"key.deserializer");
+  Deserializer valueDeserializerObject = getKafkaDeserializer(props, 
"value.deserializer");
+
+  return new KafkaConsumer<>(props, keyDeserializerObject, 
valueDeserializerObject);
+}
+finally {
+  Thread.currentThread().setContextClassLoader(currCtxCl);
+}
+  }
+
+  @VisibleForTesting
+  public KafkaRecordSupplier(
+  Map consumerProperties,
+  ObjectMapper sortingMapper,
+  Map consumerConfigs
+  )
+  {
+this.consumerProperties = consumerProperties;
+this.sortingMapper = sortingMapper;
+this.consumer = getKafkaConsumer(consumerConfigs);
+  }
 
 Review comment:
`getKafkaConsumer ` depends on another private method,  
`addConsumerPropertiesFromConfig`, copy both method to test file?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-05 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342776259
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 ##
 @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable)
   return null;
 });
   }
+
+  @VisibleForTesting
+  private KafkaConsumer getKafkaConsumer(Map 
consumerConfigs)
+  {
+final Properties props = new Properties();
+addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
+props.putAll(consumerConfigs);
+
+ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+try {
+  
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+  Deserializer keyDeserializerObject = getKafkaDeserializer(props, 
"key.deserializer");
+  Deserializer valueDeserializerObject = getKafkaDeserializer(props, 
"value.deserializer");
+
+  return new KafkaConsumer<>(props, keyDeserializerObject, 
valueDeserializerObject);
+}
+finally {
+  Thread.currentThread().setContextClassLoader(currCtxCl);
+}
+  }
+
+  @VisibleForTesting
+  public KafkaRecordSupplier(
+  Map consumerProperties,
+  ObjectMapper sortingMapper,
+  Map consumerConfigs
+  )
+  {
+this.consumerProperties = consumerProperties;
+this.sortingMapper = sortingMapper;
+this.consumer = getKafkaConsumer(consumerConfigs);
+  }
 
 Review comment:
   and `getKafkaConsumer ` depends on another private method,  
`addConsumerPropertiesFromConfig`, copy both method to test file?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-05 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342773057
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 ##
 @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable)
   return null;
 });
   }
+
+  @VisibleForTesting
+  private KafkaConsumer getKafkaConsumer(Map 
consumerConfigs)
+  {
+final Properties props = new Properties();
+addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
+props.putAll(consumerConfigs);
+
+ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+try {
+  
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+  Deserializer keyDeserializerObject = getKafkaDeserializer(props, 
"key.deserializer");
+  Deserializer valueDeserializerObject = getKafkaDeserializer(props, 
"value.deserializer");
+
+  return new KafkaConsumer<>(props, keyDeserializerObject, 
valueDeserializerObject);
+}
+finally {
+  Thread.currentThread().setContextClassLoader(currCtxCl);
+}
+  }
+
+  @VisibleForTesting
+  public KafkaRecordSupplier(
+  Map consumerProperties,
+  ObjectMapper sortingMapper,
+  Map consumerConfigs
+  )
+  {
+this.consumerProperties = consumerProperties;
+this.sortingMapper = sortingMapper;
+this.consumer = getKafkaConsumer(consumerConfigs);
+  }
 
 Review comment:
   `getKafkaConsumer()` is a private method. And it depends on other parameters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-05 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342773057
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 ##
 @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable)
   return null;
 });
   }
+
+  @VisibleForTesting
+  private KafkaConsumer getKafkaConsumer(Map 
consumerConfigs)
+  {
+final Properties props = new Properties();
+addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
+props.putAll(consumerConfigs);
+
+ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+try {
+  
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+  Deserializer keyDeserializerObject = getKafkaDeserializer(props, 
"key.deserializer");
+  Deserializer valueDeserializerObject = getKafkaDeserializer(props, 
"value.deserializer");
+
+  return new KafkaConsumer<>(props, keyDeserializerObject, 
valueDeserializerObject);
+}
+finally {
+  Thread.currentThread().setContextClassLoader(currCtxCl);
+}
+  }
+
+  @VisibleForTesting
+  public KafkaRecordSupplier(
+  Map consumerProperties,
+  ObjectMapper sortingMapper,
+  Map consumerConfigs
+  )
+  {
+this.consumerProperties = consumerProperties;
+this.sortingMapper = sortingMapper;
+this.consumer = getKafkaConsumer(consumerConfigs);
+  }
 
 Review comment:
   `getKafkaConsumer()` is a private method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-05 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342773057
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 ##
 @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable)
   return null;
 });
   }
+
+  @VisibleForTesting
+  private KafkaConsumer getKafkaConsumer(Map 
consumerConfigs)
+  {
+final Properties props = new Properties();
+addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
+props.putAll(consumerConfigs);
+
+ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+try {
+  
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+  Deserializer keyDeserializerObject = getKafkaDeserializer(props, 
"key.deserializer");
+  Deserializer valueDeserializerObject = getKafkaDeserializer(props, 
"value.deserializer");
+
+  return new KafkaConsumer<>(props, keyDeserializerObject, 
valueDeserializerObject);
+}
+finally {
+  Thread.currentThread().setContextClassLoader(currCtxCl);
+}
+  }
+
+  @VisibleForTesting
+  public KafkaRecordSupplier(
+  Map consumerProperties,
+  ObjectMapper sortingMapper,
+  Map consumerConfigs
+  )
+  {
+this.consumerProperties = consumerProperties;
+this.sortingMapper = sortingMapper;
+this.consumer = getKafkaConsumer(consumerConfigs);
+  }
 
 Review comment:
   getKafkaConsumer() is a private method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-04 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342334184
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 ##
 @@ -585,6 +589,108 @@ public void testLatestOffset() throws Exception
 );
   }
 
+  /**
+   * Test if partitionIds get updated
+   */
+  @Test
+  public void testPartitionIdsUpdates() throws Exception
+  {
+supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null);
+addSomeEvents(1100);
+
+Capture captured = Capture.newInstance();
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+new KafkaDataSourceMetadata(
+null
+)
+).anyTimes();
+EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+replayAll();
+
+supervisor.start();
+supervisor.runInternal();
+verifyAll();
+
+Assert.assertFalse(supervisor.isPartitionIdsEmpty());
+  }
+
+
+  /**
+   * Test For if always use earliest offset on newly discovered partitions
+   */
+  @Ignore("This is a regression test that needs to wait 10s+, ignore for now")
+  @Test
+  public void testLatestOffsetOnDiscovery() throws Exception
+  {
+supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null);
+addSomeEvents(9);
+
+Capture captured = Capture.newInstance();
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+new KafkaDataSourceMetadata(
+null
+)
+).anyTimes();
+EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+replayAll();
+supervisor.start();
+supervisor.runInternal();
+verifyAll();
+
+KafkaIndexTask task = captured.getValue();
+Assert.assertEquals(
+10,
+
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0).longValue()
+);
+Assert.assertEquals(
+10,
+
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1).longValue()
+);
+Assert.assertEquals(
+10,
+
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2).longValue()
+);
+
+addMoreEvents(9, 6);
+Thread.sleep(1);
 
 Review comment:
   https://github.com/apache/incubator-druid/pull/8748#issuecomment-548955564


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions

2019-11-04 Thread GitBox
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on 
kafka newly discovered partitions
URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342334184
 
 

 ##
 File path: 
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 ##
 @@ -585,6 +589,108 @@ public void testLatestOffset() throws Exception
 );
   }
 
+  /**
+   * Test if partitionIds get updated
+   */
+  @Test
+  public void testPartitionIdsUpdates() throws Exception
+  {
+supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null);
+addSomeEvents(1100);
+
+Capture captured = Capture.newInstance();
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+new KafkaDataSourceMetadata(
+null
+)
+).anyTimes();
+EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+replayAll();
+
+supervisor.start();
+supervisor.runInternal();
+verifyAll();
+
+Assert.assertFalse(supervisor.isPartitionIdsEmpty());
+  }
+
+
+  /**
+   * Test For if always use earliest offset on newly discovered partitions
+   */
+  @Ignore("This is a regression test that needs to wait 10s+, ignore for now")
+  @Test
+  public void testLatestOffsetOnDiscovery() throws Exception
+  {
+supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null);
+addSomeEvents(9);
+
+Capture captured = Capture.newInstance();
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+new KafkaDataSourceMetadata(
+null
+)
+).anyTimes();
+EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+replayAll();
+supervisor.start();
+supervisor.runInternal();
+verifyAll();
+
+KafkaIndexTask task = captured.getValue();
+Assert.assertEquals(
+10,
+
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0).longValue()
+);
+Assert.assertEquals(
+10,
+
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1).longValue()
+);
+Assert.assertEquals(
+10,
+
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2).longValue()
+);
+
+addMoreEvents(9, 6);
+Thread.sleep(1);
 
 Review comment:
   https://github.com/apache/incubator-druid/pull/8748#discussion_r342276353


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org