[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r343357969 ## File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java ## @@ -66,6 +67,18 @@ public KafkaRecordSupplier( this.consumer = getKafkaConsumer(); } + @VisibleForTesting Review comment: can not reference `KafkaRecordSupplier.getKafkaConsumer()` before supertype constructor has been called. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342776259 ## File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java ## @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable) return null; }); } + + @VisibleForTesting + private KafkaConsumer getKafkaConsumer(Map consumerConfigs) + { +final Properties props = new Properties(); +addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); +props.putAll(consumerConfigs); + +ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); +try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); + Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); + + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); +} +finally { + Thread.currentThread().setContextClassLoader(currCtxCl); +} + } + + @VisibleForTesting + public KafkaRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + Map consumerConfigs + ) + { +this.consumerProperties = consumerProperties; +this.sortingMapper = sortingMapper; +this.consumer = getKafkaConsumer(consumerConfigs); + } Review comment: `getKafkaConsumer ` depends on another private method, `addConsumerPropertiesFromConfig`, move both methods to test file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342773057 ## File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java ## @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable) return null; }); } + + @VisibleForTesting + private KafkaConsumer getKafkaConsumer(Map consumerConfigs) + { +final Properties props = new Properties(); +addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); +props.putAll(consumerConfigs); + +ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); +try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); + Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); + + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); +} +finally { + Thread.currentThread().setContextClassLoader(currCtxCl); +} + } + + @VisibleForTesting + public KafkaRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + Map consumerConfigs + ) + { +this.consumerProperties = consumerProperties; +this.sortingMapper = sortingMapper; +this.consumer = getKafkaConsumer(consumerConfigs); + } Review comment: `getKafkaConsumer()` is a private method. And it depends on other parameters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342776259 ## File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java ## @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable) return null; }); } + + @VisibleForTesting + private KafkaConsumer getKafkaConsumer(Map consumerConfigs) + { +final Properties props = new Properties(); +addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); +props.putAll(consumerConfigs); + +ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); +try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); + Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); + + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); +} +finally { + Thread.currentThread().setContextClassLoader(currCtxCl); +} + } + + @VisibleForTesting + public KafkaRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + Map consumerConfigs + ) + { +this.consumerProperties = consumerProperties; +this.sortingMapper = sortingMapper; +this.consumer = getKafkaConsumer(consumerConfigs); + } Review comment: `getKafkaConsumer ` depends on another private method, `addConsumerPropertiesFromConfig`, copy both method to test file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342776259 ## File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java ## @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable) return null; }); } + + @VisibleForTesting + private KafkaConsumer getKafkaConsumer(Map consumerConfigs) + { +final Properties props = new Properties(); +addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); +props.putAll(consumerConfigs); + +ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); +try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); + Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); + + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); +} +finally { + Thread.currentThread().setContextClassLoader(currCtxCl); +} + } + + @VisibleForTesting + public KafkaRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + Map consumerConfigs + ) + { +this.consumerProperties = consumerProperties; +this.sortingMapper = sortingMapper; +this.consumer = getKafkaConsumer(consumerConfigs); + } Review comment: and `getKafkaConsumer ` depends on another private method, `addConsumerPropertiesFromConfig`, copy both method to test file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342773057 ## File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java ## @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable) return null; }); } + + @VisibleForTesting + private KafkaConsumer getKafkaConsumer(Map consumerConfigs) + { +final Properties props = new Properties(); +addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); +props.putAll(consumerConfigs); + +ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); +try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); + Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); + + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); +} +finally { + Thread.currentThread().setContextClassLoader(currCtxCl); +} + } + + @VisibleForTesting + public KafkaRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + Map consumerConfigs + ) + { +this.consumerProperties = consumerProperties; +this.sortingMapper = sortingMapper; +this.consumer = getKafkaConsumer(consumerConfigs); + } Review comment: `getKafkaConsumer()` is a private method. And it depends on other parameters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342773057 ## File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java ## @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable) return null; }); } + + @VisibleForTesting + private KafkaConsumer getKafkaConsumer(Map consumerConfigs) + { +final Properties props = new Properties(); +addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); +props.putAll(consumerConfigs); + +ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); +try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); + Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); + + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); +} +finally { + Thread.currentThread().setContextClassLoader(currCtxCl); +} + } + + @VisibleForTesting + public KafkaRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + Map consumerConfigs + ) + { +this.consumerProperties = consumerProperties; +this.sortingMapper = sortingMapper; +this.consumer = getKafkaConsumer(consumerConfigs); + } Review comment: `getKafkaConsumer()` is a private method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342773057 ## File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java ## @@ -259,4 +260,36 @@ private static void wrapExceptions(Runnable runnable) return null; }); } + + @VisibleForTesting + private KafkaConsumer getKafkaConsumer(Map consumerConfigs) + { +final Properties props = new Properties(); +addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); +props.putAll(consumerConfigs); + +ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); +try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); + Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); + + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); +} +finally { + Thread.currentThread().setContextClassLoader(currCtxCl); +} + } + + @VisibleForTesting + public KafkaRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + Map consumerConfigs + ) + { +this.consumerProperties = consumerProperties; +this.sortingMapper = sortingMapper; +this.consumer = getKafkaConsumer(consumerConfigs); + } Review comment: getKafkaConsumer() is a private method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342334184 ## File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java ## @@ -585,6 +589,108 @@ public void testLatestOffset() throws Exception ); } + /** + * Test if partitionIds get updated + */ + @Test + public void testPartitionIdsUpdates() throws Exception + { +supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null); +addSomeEvents(1100); + +Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( +new KafkaDataSourceMetadata( +null +) +).anyTimes(); +EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); +replayAll(); + +supervisor.start(); +supervisor.runInternal(); +verifyAll(); + +Assert.assertFalse(supervisor.isPartitionIdsEmpty()); + } + + + /** + * Test For if always use earliest offset on newly discovered partitions + */ + @Ignore("This is a regression test that needs to wait 10s+, ignore for now") + @Test + public void testLatestOffsetOnDiscovery() throws Exception + { +supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null); +addSomeEvents(9); + +Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( +new KafkaDataSourceMetadata( +null +) +).anyTimes(); +EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); +replayAll(); +supervisor.start(); +supervisor.runInternal(); +verifyAll(); + +KafkaIndexTask task = captured.getValue(); +Assert.assertEquals( +10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0).longValue() +); +Assert.assertEquals( +10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1).longValue() +); +Assert.assertEquals( +10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2).longValue() +); + +addMoreEvents(9, 6); +Thread.sleep(1); Review comment: https://github.com/apache/incubator-druid/pull/8748#issuecomment-548955564 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions
SEKIRO-J commented on a change in pull request #8748: Use earliest offset on kafka newly discovered partitions URL: https://github.com/apache/incubator-druid/pull/8748#discussion_r342334184 ## File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java ## @@ -585,6 +589,108 @@ public void testLatestOffset() throws Exception ); } + /** + * Test if partitionIds get updated + */ + @Test + public void testPartitionIdsUpdates() throws Exception + { +supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null); +addSomeEvents(1100); + +Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( +new KafkaDataSourceMetadata( +null +) +).anyTimes(); +EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); +replayAll(); + +supervisor.start(); +supervisor.runInternal(); +verifyAll(); + +Assert.assertFalse(supervisor.isPartitionIdsEmpty()); + } + + + /** + * Test For if always use earliest offset on newly discovered partitions + */ + @Ignore("This is a regression test that needs to wait 10s+, ignore for now") + @Test + public void testLatestOffsetOnDiscovery() throws Exception + { +supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null); +addSomeEvents(9); + +Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( +new KafkaDataSourceMetadata( +null +) +).anyTimes(); +EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); +replayAll(); +supervisor.start(); +supervisor.runInternal(); +verifyAll(); + +KafkaIndexTask task = captured.getValue(); +Assert.assertEquals( +10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0).longValue() +); +Assert.assertEquals( +10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1).longValue() +); +Assert.assertEquals( +10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2).longValue() +); + +addMoreEvents(9, 6); +Thread.sleep(1); Review comment: https://github.com/apache/incubator-druid/pull/8748#discussion_r342276353 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org