reswqa commented on code in PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497


##########
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java:
##########
@@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() {
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    @Test
+    void testOverrideFailureHandler() {
+        final FailureHandler failureHandler = (failure) -> {};
+        final ElasticsearchSink<Object> sink =
+                
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+        final InitContext sinkInitContext = new MockInitContext();
+        final BulkResponseInspector bulkResponseInspector =
+                
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+        assertThat(bulkResponseInspector)
+                .isInstanceOf(DefaultBulkResponseInspector.class)
+                .extracting(
+                        (inspector) -> ((DefaultBulkResponseInspector) 
inspector).failureHandler)
+                .isEqualTo(failureHandler);
+    }
+
+    @Test
+    void testOverrideBulkResponseInspectorFactory() {
+        final AtomicBoolean called = new AtomicBoolean();
+        final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+                initContext -> {
+                    final MetricGroup metricGroup = initContext.metricGroup();
+                    metricGroup.addGroup("bulk").addGroup("result", 
"failed").counter("actions");
+                    called.set(true);
+                    return (BulkResponseInspector) (request, response) -> {};
+                };
+        final ElasticsearchSink<Object> sink =
+                createMinimalBuilder()
+                        
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
+                        .build();
+
+        final InitContext sinkInitContext = new MockInitContext();
+
+        assertThatCode(() -> 
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
+        assertThat(called).isTrue();
+    }
+
     abstract B createEmptyBuilder();
 
     abstract B createMinimalBuilder();
+
+    private static class DummyMailboxExecutor implements MailboxExecutor {
+        private DummyMailboxExecutor() {}
+
+        public void execute(
+                ThrowingRunnable<? extends Exception> command,
+                String descriptionFormat,
+                Object... descriptionArgs) {}
+
+        public void yield() throws InterruptedException, FlinkRuntimeException 
{}
+
+        public boolean tryYield() throws FlinkRuntimeException {
+            return false;
+        }
+    }
+
+    private static class MockInitContext
+            implements Sink.InitContext, 
SerializationSchema.InitializationContext {
+
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return SimpleUserCodeClassLoader.create(
+                    ElasticsearchSinkBuilderBaseTest.class.getClassLoader());
+        }
+
+        public MailboxExecutor getMailboxExecutor() {
+            return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor();
+        }
+
+        public ProcessingTimeService getProcessingTimeService() {
+            return new TestProcessingTimeService();
+        }
+
+        public int getSubtaskId() {
+            return 0;
+        }
+
+        public int getNumberOfParallelSubtasks() {
+            return 0;
+        }
+
+        public int getAttemptNumber() {
+            return 0;
+        }
+
+        public SinkWriterMetricGroup metricGroup() {

Review Comment:
   We should register IOMetricGroup also.
   
   ```java
   public SinkWriterMetricGroup metricGroup() {
               final OperatorIOMetricGroup operatorIOMetricGroup =
                       
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
               return InternalSinkWriterMetricGroup.wrap(
                       new TestingSinkWriterMetricGroup.Builder()
                               .setParentMetricGroup(new 
UnregisteredMetricsGroup())
                               .setIoMetricGroupSupplier(() 
->operatorIOMetricGroup)
                               .build());
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to