reswqa commented on code in PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497
########## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java: ########## @@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() { .isInstanceOf(IllegalStateException.class); } + @Test + void testOverrideFailureHandler() { + final FailureHandler failureHandler = (failure) -> {}; + final ElasticsearchSink<Object> sink = + createMinimalBuilder().setFailureHandler(failureHandler).build(); + + final InitContext sinkInitContext = new MockInitContext(); + final BulkResponseInspector bulkResponseInspector = + sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup); + assertThat(bulkResponseInspector) + .isInstanceOf(DefaultBulkResponseInspector.class) + .extracting( + (inspector) -> ((DefaultBulkResponseInspector) inspector).failureHandler) + .isEqualTo(failureHandler); + } + + @Test + void testOverrideBulkResponseInspectorFactory() { + final AtomicBoolean called = new AtomicBoolean(); + final BulkResponseInspectorFactory bulkResponseInspectorFactory = + initContext -> { + final MetricGroup metricGroup = initContext.metricGroup(); + metricGroup.addGroup("bulk").addGroup("result", "failed").counter("actions"); + called.set(true); + return (BulkResponseInspector) (request, response) -> {}; + }; + final ElasticsearchSink<Object> sink = + createMinimalBuilder() + .setBulkResponseInspectorFactory(bulkResponseInspectorFactory) + .build(); + + final InitContext sinkInitContext = new MockInitContext(); + + assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException(); + assertThat(called).isTrue(); + } + abstract B createEmptyBuilder(); abstract B createMinimalBuilder(); + + private static class DummyMailboxExecutor implements MailboxExecutor { + private DummyMailboxExecutor() {} + + public void execute( + ThrowingRunnable<? extends Exception> command, + String descriptionFormat, + Object... descriptionArgs) {} + + public void yield() throws InterruptedException, FlinkRuntimeException {} + + public boolean tryYield() throws FlinkRuntimeException { + return false; + } + } + + private static class MockInitContext + implements Sink.InitContext, SerializationSchema.InitializationContext { + + public UserCodeClassLoader getUserCodeClassLoader() { + return SimpleUserCodeClassLoader.create( + ElasticsearchSinkBuilderBaseTest.class.getClassLoader()); + } + + public MailboxExecutor getMailboxExecutor() { + return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor(); + } + + public ProcessingTimeService getProcessingTimeService() { + return new TestProcessingTimeService(); + } + + public int getSubtaskId() { + return 0; + } + + public int getNumberOfParallelSubtasks() { + return 0; + } + + public int getAttemptNumber() { + return 0; + } + + public SinkWriterMetricGroup metricGroup() { Review Comment: We should register `IOMetricGroup` also, it will throw NPE otherwise. ```java public SinkWriterMetricGroup metricGroup() { final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup() .getIOMetricGroup(); return InternalSinkWriterMetricGroup.wrap( new TestingSinkWriterMetricGroup.Builder() .setParentMetricGroup(new UnregisteredMetricsGroup()) .setIoMetricGroupSupplier(() -> operatorIOMetricGroup) .build()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org