Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-15 Thread via GitHub


reswqa commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1857562853

   Thanks @schulzp, nice one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-15 Thread via GitHub


boring-cyborg[bot] commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1857562560

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-15 Thread via GitHub


reswqa merged PR #83:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/83


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-14 Thread via GitHub


reswqa commented on code in PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497


##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java:
##
@@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() {
 .isInstanceOf(IllegalStateException.class);
 }
 
+@Test
+void testOverrideFailureHandler() {
+final FailureHandler failureHandler = (failure) -> {};
+final ElasticsearchSink sink =
+
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+final InitContext sinkInitContext = new MockInitContext();
+final BulkResponseInspector bulkResponseInspector =
+
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+assertThat(bulkResponseInspector)
+.isInstanceOf(DefaultBulkResponseInspector.class)
+.extracting(
+(inspector) -> ((DefaultBulkResponseInspector) 
inspector).failureHandler)
+.isEqualTo(failureHandler);
+}
+
+@Test
+void testOverrideBulkResponseInspectorFactory() {
+final AtomicBoolean called = new AtomicBoolean();
+final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+initContext -> {
+final MetricGroup metricGroup = initContext.metricGroup();
+metricGroup.addGroup("bulk").addGroup("result", 
"failed").counter("actions");
+called.set(true);
+return (BulkResponseInspector) (request, response) -> {};
+};
+final ElasticsearchSink sink =
+createMinimalBuilder()
+
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
+.build();
+
+final InitContext sinkInitContext = new MockInitContext();
+
+assertThatCode(() -> 
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
+assertThat(called).isTrue();
+}
+
 abstract B createEmptyBuilder();
 
 abstract B createMinimalBuilder();
+
+private static class DummyMailboxExecutor implements MailboxExecutor {
+private DummyMailboxExecutor() {}
+
+public void execute(
+ThrowingRunnable command,
+String descriptionFormat,
+Object... descriptionArgs) {}
+
+public void yield() throws InterruptedException, FlinkRuntimeException 
{}
+
+public boolean tryYield() throws FlinkRuntimeException {
+return false;
+}
+}
+
+private static class MockInitContext
+implements Sink.InitContext, 
SerializationSchema.InitializationContext {
+
+public UserCodeClassLoader getUserCodeClassLoader() {
+return SimpleUserCodeClassLoader.create(
+ElasticsearchSinkBuilderBaseTest.class.getClassLoader());
+}
+
+public MailboxExecutor getMailboxExecutor() {
+return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor();
+}
+
+public ProcessingTimeService getProcessingTimeService() {
+return new TestProcessingTimeService();
+}
+
+public int getSubtaskId() {
+return 0;
+}
+
+public int getNumberOfParallelSubtasks() {
+return 0;
+}
+
+public int getAttemptNumber() {
+return 0;
+}
+
+public SinkWriterMetricGroup metricGroup() {

Review Comment:
   We should register `IOMetricGroup` also, it will throw NPE otherwise.
   
   ```java
   public SinkWriterMetricGroup metricGroup() {
   final OperatorIOMetricGroup operatorIOMetricGroup =
   
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()
   .getIOMetricGroup();
   return InternalSinkWriterMetricGroup.wrap(
   new TestingSinkWriterMetricGroup.Builder()
   .setParentMetricGroup(new 
UnregisteredMetricsGroup())
   .setIoMetricGroupSupplier(() -> 
operatorIOMetricGroup)
   .build());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-14 Thread via GitHub


reswqa commented on code in PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497


##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java:
##
@@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() {
 .isInstanceOf(IllegalStateException.class);
 }
 
+@Test
+void testOverrideFailureHandler() {
+final FailureHandler failureHandler = (failure) -> {};
+final ElasticsearchSink sink =
+
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+final InitContext sinkInitContext = new MockInitContext();
+final BulkResponseInspector bulkResponseInspector =
+
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+assertThat(bulkResponseInspector)
+.isInstanceOf(DefaultBulkResponseInspector.class)
+.extracting(
+(inspector) -> ((DefaultBulkResponseInspector) 
inspector).failureHandler)
+.isEqualTo(failureHandler);
+}
+
+@Test
+void testOverrideBulkResponseInspectorFactory() {
+final AtomicBoolean called = new AtomicBoolean();
+final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+initContext -> {
+final MetricGroup metricGroup = initContext.metricGroup();
+metricGroup.addGroup("bulk").addGroup("result", 
"failed").counter("actions");
+called.set(true);
+return (BulkResponseInspector) (request, response) -> {};
+};
+final ElasticsearchSink sink =
+createMinimalBuilder()
+
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
+.build();
+
+final InitContext sinkInitContext = new MockInitContext();
+
+assertThatCode(() -> 
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
+assertThat(called).isTrue();
+}
+
 abstract B createEmptyBuilder();
 
 abstract B createMinimalBuilder();
+
+private static class DummyMailboxExecutor implements MailboxExecutor {
+private DummyMailboxExecutor() {}
+
+public void execute(
+ThrowingRunnable command,
+String descriptionFormat,
+Object... descriptionArgs) {}
+
+public void yield() throws InterruptedException, FlinkRuntimeException 
{}
+
+public boolean tryYield() throws FlinkRuntimeException {
+return false;
+}
+}
+
+private static class MockInitContext
+implements Sink.InitContext, 
SerializationSchema.InitializationContext {
+
+public UserCodeClassLoader getUserCodeClassLoader() {
+return SimpleUserCodeClassLoader.create(
+ElasticsearchSinkBuilderBaseTest.class.getClassLoader());
+}
+
+public MailboxExecutor getMailboxExecutor() {
+return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor();
+}
+
+public ProcessingTimeService getProcessingTimeService() {
+return new TestProcessingTimeService();
+}
+
+public int getSubtaskId() {
+return 0;
+}
+
+public int getNumberOfParallelSubtasks() {
+return 0;
+}
+
+public int getAttemptNumber() {
+return 0;
+}
+
+public SinkWriterMetricGroup metricGroup() {

Review Comment:
   We should register `IOMetricGroup` also, it will throw NPE otherwise.
   
   ```java
   public SinkWriterMetricGroup metricGroup() {
   final OperatorIOMetricGroup operatorIOMetricGroup =
   
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()
   .getIOMetricGroup();
   return InternalSinkWriterMetricGroup.wrap(
   new TestingSinkWriterMetricGroup.Builder()
   .setParentMetricGroup(new 
UnregisteredMetricsGroup())
   .setIoMetricGroupSupplier(() -> 
operatorIOMetricGroup)
   .build());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-14 Thread via GitHub


reswqa commented on code in PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497


##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java:
##
@@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() {
 .isInstanceOf(IllegalStateException.class);
 }
 
+@Test
+void testOverrideFailureHandler() {
+final FailureHandler failureHandler = (failure) -> {};
+final ElasticsearchSink sink =
+
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+final InitContext sinkInitContext = new MockInitContext();
+final BulkResponseInspector bulkResponseInspector =
+
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+assertThat(bulkResponseInspector)
+.isInstanceOf(DefaultBulkResponseInspector.class)
+.extracting(
+(inspector) -> ((DefaultBulkResponseInspector) 
inspector).failureHandler)
+.isEqualTo(failureHandler);
+}
+
+@Test
+void testOverrideBulkResponseInspectorFactory() {
+final AtomicBoolean called = new AtomicBoolean();
+final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+initContext -> {
+final MetricGroup metricGroup = initContext.metricGroup();
+metricGroup.addGroup("bulk").addGroup("result", 
"failed").counter("actions");
+called.set(true);
+return (BulkResponseInspector) (request, response) -> {};
+};
+final ElasticsearchSink sink =
+createMinimalBuilder()
+
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
+.build();
+
+final InitContext sinkInitContext = new MockInitContext();
+
+assertThatCode(() -> 
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
+assertThat(called).isTrue();
+}
+
 abstract B createEmptyBuilder();
 
 abstract B createMinimalBuilder();
+
+private static class DummyMailboxExecutor implements MailboxExecutor {
+private DummyMailboxExecutor() {}
+
+public void execute(
+ThrowingRunnable command,
+String descriptionFormat,
+Object... descriptionArgs) {}
+
+public void yield() throws InterruptedException, FlinkRuntimeException 
{}
+
+public boolean tryYield() throws FlinkRuntimeException {
+return false;
+}
+}
+
+private static class MockInitContext
+implements Sink.InitContext, 
SerializationSchema.InitializationContext {
+
+public UserCodeClassLoader getUserCodeClassLoader() {
+return SimpleUserCodeClassLoader.create(
+ElasticsearchSinkBuilderBaseTest.class.getClassLoader());
+}
+
+public MailboxExecutor getMailboxExecutor() {
+return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor();
+}
+
+public ProcessingTimeService getProcessingTimeService() {
+return new TestProcessingTimeService();
+}
+
+public int getSubtaskId() {
+return 0;
+}
+
+public int getNumberOfParallelSubtasks() {
+return 0;
+}
+
+public int getAttemptNumber() {
+return 0;
+}
+
+public SinkWriterMetricGroup metricGroup() {

Review Comment:
   We should register IOMetricGroup also, it will throw NPE otherwise.
   
   ```java
   public SinkWriterMetricGroup metricGroup() {
   final OperatorIOMetricGroup operatorIOMetricGroup =
   
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
   return InternalSinkWriterMetricGroup.wrap(
   new TestingSinkWriterMetricGroup.Builder()
   .setParentMetricGroup(new 
UnregisteredMetricsGroup())
   .setIoMetricGroupSupplier(() 
->operatorIOMetricGroup)
   .build());
   }
   ```



##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java:
##
@@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() {
 .isInstanceOf(IllegalStateException.class);
 }
 
+@Test
+void testOverrideFailureHandler() {
+final FailureHandler failureHandler = (failure) -> {};
+final ElasticsearchSink sink =
+
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+final InitContext sinkInitContext = new MockInitContext();
+final BulkResponseInspector bulkResponseInspector =
+
sink.ge

Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-14 Thread via GitHub


reswqa commented on code in PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497


##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java:
##
@@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() {
 .isInstanceOf(IllegalStateException.class);
 }
 
+@Test
+void testOverrideFailureHandler() {
+final FailureHandler failureHandler = (failure) -> {};
+final ElasticsearchSink sink =
+
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+final InitContext sinkInitContext = new MockInitContext();
+final BulkResponseInspector bulkResponseInspector =
+
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+assertThat(bulkResponseInspector)
+.isInstanceOf(DefaultBulkResponseInspector.class)
+.extracting(
+(inspector) -> ((DefaultBulkResponseInspector) 
inspector).failureHandler)
+.isEqualTo(failureHandler);
+}
+
+@Test
+void testOverrideBulkResponseInspectorFactory() {
+final AtomicBoolean called = new AtomicBoolean();
+final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+initContext -> {
+final MetricGroup metricGroup = initContext.metricGroup();
+metricGroup.addGroup("bulk").addGroup("result", 
"failed").counter("actions");
+called.set(true);
+return (BulkResponseInspector) (request, response) -> {};
+};
+final ElasticsearchSink sink =
+createMinimalBuilder()
+
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
+.build();
+
+final InitContext sinkInitContext = new MockInitContext();
+
+assertThatCode(() -> 
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
+assertThat(called).isTrue();
+}
+
 abstract B createEmptyBuilder();
 
 abstract B createMinimalBuilder();
+
+private static class DummyMailboxExecutor implements MailboxExecutor {
+private DummyMailboxExecutor() {}
+
+public void execute(
+ThrowingRunnable command,
+String descriptionFormat,
+Object... descriptionArgs) {}
+
+public void yield() throws InterruptedException, FlinkRuntimeException 
{}
+
+public boolean tryYield() throws FlinkRuntimeException {
+return false;
+}
+}
+
+private static class MockInitContext
+implements Sink.InitContext, 
SerializationSchema.InitializationContext {
+
+public UserCodeClassLoader getUserCodeClassLoader() {
+return SimpleUserCodeClassLoader.create(
+ElasticsearchSinkBuilderBaseTest.class.getClassLoader());
+}
+
+public MailboxExecutor getMailboxExecutor() {
+return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor();
+}
+
+public ProcessingTimeService getProcessingTimeService() {
+return new TestProcessingTimeService();
+}
+
+public int getSubtaskId() {
+return 0;
+}
+
+public int getNumberOfParallelSubtasks() {
+return 0;
+}
+
+public int getAttemptNumber() {
+return 0;
+}
+
+public SinkWriterMetricGroup metricGroup() {

Review Comment:
   We should register IOMetricGroup also.
   
   ```java
   public SinkWriterMetricGroup metricGroup() {
   final OperatorIOMetricGroup operatorIOMetricGroup =
   
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
   return InternalSinkWriterMetricGroup.wrap(
   new TestingSinkWriterMetricGroup.Builder()
   .setParentMetricGroup(new 
UnregisteredMetricsGroup())
   .setIoMetricGroupSupplier(() 
->operatorIOMetricGroup)
   .build());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-13 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1855334073

   @reswqa, thanks `TestingSinkWriterMetricGroup` works on all tested versions. 
I only faced the arch rules issue locally, so far. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-13 Thread via GitHub


reswqa commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1855090968

   @schulzp, thank you for the investigation.
   
   1. I think `TestingSinkWriterMetricGroup` might help.
   2. Yes, this is a violations indeed. But I'm not sure if this will lead to 
CI failure, could you link the workflow failed as this? Anyway, I agree that 
this would go beyond the scope of this PR and I can fix this later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-13 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1853540370

   @reswqa, there are two things that need to be fixed:
   
   1. use `MetricsGroupTestUtils#mockWriterMetricGroup()` instead of 
`InternalSinkWriterMetricGroup.mock()`, however, the underlying change was 
introduced [only 
recently](https://github.com/apache/flink/commit/92951a05127f1e0e2ab0ea04ae022659fc5276ab)
 code relying on it would not be compatible with 1.17.1 or 1.18.0
   2. fix usage of non-public API which is enforced by the `ConnectorRules`, 
but this would go beyond the scope of this PR, since the main branch triggers 
those violations (only Java 11 and 17) already, see below
   
   ```
   [ERROR] Failures: 
   [ERROR]   Architecture Violation [Priority: MEDIUM] - Rule 'Connector 
production code must depend only on public API when outside of connector 
packages' was violated (11 times):
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, 
org.apache.flink.connector.base.DeliveryGuarantee, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls 
method  in (ElasticsearchSink.java:73)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, 
org.apache.flink.connector.base.DeliveryGuarantee, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls 
method  in 
(ElasticsearchSink.java:71)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, 
org.apache.flink.connector.base.DeliveryGuarantee, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls 
method  in 
(ElasticsearchSink.java:72)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, 
org.apache.flink.connector.base.DeliveryGuarantee, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls 
method  in 
(ElasticsearchSink.java:74)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, 
org.apache.flink.connector.base.DeliveryGuarantee, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls 
method  in 
(ElasticsearchSink.java:75)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, 
org.apache.flink.connector.base.DeliveryGuarantee, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls 
method  in 
(ElasticsearchSink.java:76)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, 
org.apache.flink.connector.base.DeliveryGuarantee, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls 
method  in 
(ElasticsearchSink.java:77)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, boolean, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig, 
org.apache.flink.metrics.groups.SinkWriterMetricGroup, 
org.apache.flink.api.common.operators.MailboxExecutor)> calls method 
 in 
(ElasticsearchWriter.java:107)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, boolean, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, 
org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig, 
org.apache.flink.metrics.groups.SinkWriterMetricGroup, 
org.apache.flink.api.common.operators.MailboxExecutor)> calls method 
 in 
(ElasticsearchWriter.java:97)
   Constructor 
(java.util.List,
 org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, boolean, 
org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, 
org.apache.fl

Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-12 Thread via GitHub


reswqa commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1853168166

   @schulzp That PR has been merged. I have pushed an empty commit to your 
branch to re-trigger CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-11 Thread via GitHub


reswqa commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1851249351

   We should waiting for https://github.com/apache/flink/pull/23876 to be 
merged(This won't take long as it has already been approved by 3 votes). After 
that, CI should be able to pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-11 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1850111290

   @reswqa, fixed the cause of the `NotSerializableException` (implicit 
reference of anonymous class to `ElasticsearchSinkBuilderBase`). CI should pass 
now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-07 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1845356753

   @reswqa, I overrode the missing methods required by 1.18.0's 
`Sink.InitContext`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on code in PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1418511130


##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.assertj.core.api.Assertions;
+import org.elasticsearch.action.DocWriteRequest.OpType;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+public class DefaultBulkResponseInspectorTest {
+
+@Test
+void testPassWithoutFailures() {
+final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+inspector.inspect(new BulkRequest(), new BulkResponse(new 
BulkItemResponse[] {}, 0));
+}
+
+@Test
+void testThrowsChainedFailure() {
+final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+.isThrownBy(
+() -> {
+final BulkRequest request = new BulkRequest();
+request.add(
+new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+inspector.inspect(
+request,
+new BulkResponse(
+new BulkItemResponse[] {
+new BulkItemResponse(
+0, OpType.CREATE, 
(DocWriteResponse) null),
+new BulkItemResponse(
+1,
+OpType.DELETE,
+new Failure(
+"index",
+"type",
+"id",
+new 
IOException("A"))),
+new BulkItemResponse(
+2,
+OpType.DELETE,
+new Failure(
+"index",
+"type",
+"id",
+new 
IOException("B")))
+},
+0));
+});
+}
+
+@Test
+void testPassesDespiteChainedFailure() {
+final DefaultBulkResponseInspector inspector =
+new DefaultBulkResponseInspector((failure) -> {});
+Assertions.assertThatCode(
+() -> {
+final BulkRequest request = new BulkRequest();
+request.add(
+new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+inspector.inspect(
+request,
+new Bulk

Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on code in PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1418486556


##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.assertj.core.api.Assertions;
+import org.elasticsearch.action.DocWriteRequest.OpType;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+public class DefaultBulkResponseInspectorTest {
+
+@Test
+void testPassWithoutFailures() {
+final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+inspector.inspect(new BulkRequest(), new BulkResponse(new 
BulkItemResponse[] {}, 0));
+}
+
+@Test
+void testThrowsChainedFailure() {
+final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+.isThrownBy(
+() -> {
+final BulkRequest request = new BulkRequest();
+request.add(
+new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+inspector.inspect(
+request,
+new BulkResponse(
+new BulkItemResponse[] {
+new BulkItemResponse(
+0, OpType.CREATE, 
(DocWriteResponse) null),
+new BulkItemResponse(
+1,
+OpType.DELETE,
+new Failure(
+"index",
+"type",
+"id",
+new 
IOException("A"))),
+new BulkItemResponse(
+2,
+OpType.DELETE,
+new Failure(
+"index",
+"type",
+"id",
+new 
IOException("B")))
+},
+0));
+});
+}
+
+@Test
+void testPassesDespiteChainedFailure() {
+final DefaultBulkResponseInspector inspector =
+new DefaultBulkResponseInspector((failure) -> {});
+Assertions.assertThatCode(
+() -> {
+final BulkRequest request = new BulkRequest();
+request.add(
+new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+inspector.inspect(
+request,
+new Bulk

Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


reswqa commented on code in PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1418211399


##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.assertj.core.api.Assertions;
+import org.elasticsearch.action.DocWriteRequest.OpType;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+public class DefaultBulkResponseInspectorTest {
+
+@Test
+void testPassWithoutFailures() {
+final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+inspector.inspect(new BulkRequest(), new BulkResponse(new 
BulkItemResponse[] {}, 0));

Review Comment:
   We'd better use `Assertions.assertThatCode(() -> inspector.inspect(new 
BulkRequest(), new BulkResponse(new BulkItemResponse[] {}, 
0))).doesNotThrowAnyException();` here. It's more expressive.



##
flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.assertj.core.api.Assertions;
+import org.elasticsearch.action.DocWriteRequest.OpType;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+public class DefaultBulkResponseInspectorTest {
+
+@Test
+void testPassWithoutFailures() {
+final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+inspector.inspect(new BulkRequest(), new BulkResponse(new 
BulkItemResponse[] {}, 0));
+}
+
+@Test
+void testThrowsChainedFailure() {
+final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+.isThrownBy(
+() -> {
+final BulkRequest request = new BulkRequest();
+request.add(
+new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+   

Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843307919

   @afedulov, fixed the test (and extended it)
   
   @reta, I added a `FailureHandler` with a default implementation that 
resembles the current fail-on-any-failure behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843137406

   @afedulov, sure, I'll implement it with plain java. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


afedulov commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843121034

   @schulzp thanks! 
   We try to avoid Mockito usage, unless it is not possible because because of 
external dependencies that required concrete classes rather than interfaces. 
Generally speaking, resorting to Mockito is discouraged.
   
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations
   In this case it is not necessary, you can just create mock test 
implementations as private static classes within the test.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


reta commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843109798

   > I assume there shared code to be reused and that is more about API 
consistency?
   
   There is no shared code (I think you meant that) and indeed, it is more 
about API consistency for both connectors
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843099110

   @reta, sure, so I'll rename my interfaces to match those from opensearch and 
add the factory to the opensearch sink builder  afterwards. I assume there 
shared code to be reused and that is more about API consistency?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


reta commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843053300

   > @reta, I checked out that the opensearch failure handler. That achieves 
the same in regard of error handling. However, it does not allow capturing 
additional metrics. At least for us that would not suffice.
   
   Thanks @schulzp , I think we could:
- backport the change from Opensearch connector
- enhance it with an ability to capture additional metrics
- port this change to Opensearch and Elasticsearch connectors


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843046355

   @afedulov, thanks! I added a test that makes sure the inspector is passed to 
the `ElasticsearchWriter`.
   
   @reswqa, I looked into the pipeline failed pipelines: except for one 
(against flink 1.19-SNAPSHOT) all of them have been canceled. The pipeline logs 
do not state why. Apparently some interfaces have been moved/changed in 
1.19-SNAPSHOT so it no longer compiles.
   
   @reta, I checked out that the opensearch failure handler. That achieves the 
same in regard of error handling. However, it does not allow capturing 
additional metrics. At least for us that would not suffice. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


reta commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1842971156

   @schulzp there was a similar change introduced into 
`flink-connector-opensearch`, I believe we could backport it to the 
Elasticsearch connector to have a similar model of configuration and failure 
handling.
   
   [1] https://github.com/apache/flink-connector-opensearch/pull/11


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-05 Thread via GitHub


afedulov commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1841113426

   @schulzp thanks for the contribution. The approach looks solid. I believe 
what is missing are some tests that check that the non-default inspector is 
actually utilized when set and that the MetricGroup is propagated to it 
correctly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-01 Thread via GitHub


boring-cyborg[bot] commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1835980923

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-01 Thread via GitHub


schulzp opened a new pull request, #83:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/83

   Extracted `BulkResponseInspector` interface to allow custom handling of 
(partially) failed bulk requests. If not overridden, default behaviour remains 
unchanged and partial failures are escalated.
   
   * fixes https://issues.apache.org/jira/browse/FLINK-32028
   * allows custom metrics to be exposed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org