Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1857562853 Thanks @schulzp, nice one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
boring-cyborg[bot] commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1857562560 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa merged PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on code in PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497 ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java: ## @@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() { .isInstanceOf(IllegalStateException.class); } +@Test +void testOverrideFailureHandler() { +final FailureHandler failureHandler = (failure) -> {}; +final ElasticsearchSink sink = + createMinimalBuilder().setFailureHandler(failureHandler).build(); + +final InitContext sinkInitContext = new MockInitContext(); +final BulkResponseInspector bulkResponseInspector = + sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup); +assertThat(bulkResponseInspector) +.isInstanceOf(DefaultBulkResponseInspector.class) +.extracting( +(inspector) -> ((DefaultBulkResponseInspector) inspector).failureHandler) +.isEqualTo(failureHandler); +} + +@Test +void testOverrideBulkResponseInspectorFactory() { +final AtomicBoolean called = new AtomicBoolean(); +final BulkResponseInspectorFactory bulkResponseInspectorFactory = +initContext -> { +final MetricGroup metricGroup = initContext.metricGroup(); +metricGroup.addGroup("bulk").addGroup("result", "failed").counter("actions"); +called.set(true); +return (BulkResponseInspector) (request, response) -> {}; +}; +final ElasticsearchSink sink = +createMinimalBuilder() + .setBulkResponseInspectorFactory(bulkResponseInspectorFactory) +.build(); + +final InitContext sinkInitContext = new MockInitContext(); + +assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException(); +assertThat(called).isTrue(); +} + abstract B createEmptyBuilder(); abstract B createMinimalBuilder(); + +private static class DummyMailboxExecutor implements MailboxExecutor { +private DummyMailboxExecutor() {} + +public void execute( +ThrowingRunnable command, +String descriptionFormat, +Object... descriptionArgs) {} + +public void yield() throws InterruptedException, FlinkRuntimeException {} + +public boolean tryYield() throws FlinkRuntimeException { +return false; +} +} + +private static class MockInitContext +implements Sink.InitContext, SerializationSchema.InitializationContext { + +public UserCodeClassLoader getUserCodeClassLoader() { +return SimpleUserCodeClassLoader.create( +ElasticsearchSinkBuilderBaseTest.class.getClassLoader()); +} + +public MailboxExecutor getMailboxExecutor() { +return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor(); +} + +public ProcessingTimeService getProcessingTimeService() { +return new TestProcessingTimeService(); +} + +public int getSubtaskId() { +return 0; +} + +public int getNumberOfParallelSubtasks() { +return 0; +} + +public int getAttemptNumber() { +return 0; +} + +public SinkWriterMetricGroup metricGroup() { Review Comment: We should register `IOMetricGroup` also, it will throw NPE otherwise. ```java public SinkWriterMetricGroup metricGroup() { final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup() .getIOMetricGroup(); return InternalSinkWriterMetricGroup.wrap( new TestingSinkWriterMetricGroup.Builder() .setParentMetricGroup(new UnregisteredMetricsGroup()) .setIoMetricGroupSupplier(() -> operatorIOMetricGroup) .build()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on code in PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497 ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java: ## @@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() { .isInstanceOf(IllegalStateException.class); } +@Test +void testOverrideFailureHandler() { +final FailureHandler failureHandler = (failure) -> {}; +final ElasticsearchSink sink = + createMinimalBuilder().setFailureHandler(failureHandler).build(); + +final InitContext sinkInitContext = new MockInitContext(); +final BulkResponseInspector bulkResponseInspector = + sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup); +assertThat(bulkResponseInspector) +.isInstanceOf(DefaultBulkResponseInspector.class) +.extracting( +(inspector) -> ((DefaultBulkResponseInspector) inspector).failureHandler) +.isEqualTo(failureHandler); +} + +@Test +void testOverrideBulkResponseInspectorFactory() { +final AtomicBoolean called = new AtomicBoolean(); +final BulkResponseInspectorFactory bulkResponseInspectorFactory = +initContext -> { +final MetricGroup metricGroup = initContext.metricGroup(); +metricGroup.addGroup("bulk").addGroup("result", "failed").counter("actions"); +called.set(true); +return (BulkResponseInspector) (request, response) -> {}; +}; +final ElasticsearchSink sink = +createMinimalBuilder() + .setBulkResponseInspectorFactory(bulkResponseInspectorFactory) +.build(); + +final InitContext sinkInitContext = new MockInitContext(); + +assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException(); +assertThat(called).isTrue(); +} + abstract B createEmptyBuilder(); abstract B createMinimalBuilder(); + +private static class DummyMailboxExecutor implements MailboxExecutor { +private DummyMailboxExecutor() {} + +public void execute( +ThrowingRunnable command, +String descriptionFormat, +Object... descriptionArgs) {} + +public void yield() throws InterruptedException, FlinkRuntimeException {} + +public boolean tryYield() throws FlinkRuntimeException { +return false; +} +} + +private static class MockInitContext +implements Sink.InitContext, SerializationSchema.InitializationContext { + +public UserCodeClassLoader getUserCodeClassLoader() { +return SimpleUserCodeClassLoader.create( +ElasticsearchSinkBuilderBaseTest.class.getClassLoader()); +} + +public MailboxExecutor getMailboxExecutor() { +return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor(); +} + +public ProcessingTimeService getProcessingTimeService() { +return new TestProcessingTimeService(); +} + +public int getSubtaskId() { +return 0; +} + +public int getNumberOfParallelSubtasks() { +return 0; +} + +public int getAttemptNumber() { +return 0; +} + +public SinkWriterMetricGroup metricGroup() { Review Comment: We should register `IOMetricGroup` also, it will throw NPE otherwise. ```java public SinkWriterMetricGroup metricGroup() { final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup() .getIOMetricGroup(); return InternalSinkWriterMetricGroup.wrap( new TestingSinkWriterMetricGroup.Builder() .setParentMetricGroup(new UnregisteredMetricsGroup()) .setIoMetricGroupSupplier(() -> operatorIOMetricGroup) .build()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on code in PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497 ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java: ## @@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() { .isInstanceOf(IllegalStateException.class); } +@Test +void testOverrideFailureHandler() { +final FailureHandler failureHandler = (failure) -> {}; +final ElasticsearchSink sink = + createMinimalBuilder().setFailureHandler(failureHandler).build(); + +final InitContext sinkInitContext = new MockInitContext(); +final BulkResponseInspector bulkResponseInspector = + sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup); +assertThat(bulkResponseInspector) +.isInstanceOf(DefaultBulkResponseInspector.class) +.extracting( +(inspector) -> ((DefaultBulkResponseInspector) inspector).failureHandler) +.isEqualTo(failureHandler); +} + +@Test +void testOverrideBulkResponseInspectorFactory() { +final AtomicBoolean called = new AtomicBoolean(); +final BulkResponseInspectorFactory bulkResponseInspectorFactory = +initContext -> { +final MetricGroup metricGroup = initContext.metricGroup(); +metricGroup.addGroup("bulk").addGroup("result", "failed").counter("actions"); +called.set(true); +return (BulkResponseInspector) (request, response) -> {}; +}; +final ElasticsearchSink sink = +createMinimalBuilder() + .setBulkResponseInspectorFactory(bulkResponseInspectorFactory) +.build(); + +final InitContext sinkInitContext = new MockInitContext(); + +assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException(); +assertThat(called).isTrue(); +} + abstract B createEmptyBuilder(); abstract B createMinimalBuilder(); + +private static class DummyMailboxExecutor implements MailboxExecutor { +private DummyMailboxExecutor() {} + +public void execute( +ThrowingRunnable command, +String descriptionFormat, +Object... descriptionArgs) {} + +public void yield() throws InterruptedException, FlinkRuntimeException {} + +public boolean tryYield() throws FlinkRuntimeException { +return false; +} +} + +private static class MockInitContext +implements Sink.InitContext, SerializationSchema.InitializationContext { + +public UserCodeClassLoader getUserCodeClassLoader() { +return SimpleUserCodeClassLoader.create( +ElasticsearchSinkBuilderBaseTest.class.getClassLoader()); +} + +public MailboxExecutor getMailboxExecutor() { +return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor(); +} + +public ProcessingTimeService getProcessingTimeService() { +return new TestProcessingTimeService(); +} + +public int getSubtaskId() { +return 0; +} + +public int getNumberOfParallelSubtasks() { +return 0; +} + +public int getAttemptNumber() { +return 0; +} + +public SinkWriterMetricGroup metricGroup() { Review Comment: We should register IOMetricGroup also, it will throw NPE otherwise. ```java public SinkWriterMetricGroup metricGroup() { final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); return InternalSinkWriterMetricGroup.wrap( new TestingSinkWriterMetricGroup.Builder() .setParentMetricGroup(new UnregisteredMetricsGroup()) .setIoMetricGroupSupplier(() ->operatorIOMetricGroup) .build()); } ``` ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java: ## @@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() { .isInstanceOf(IllegalStateException.class); } +@Test +void testOverrideFailureHandler() { +final FailureHandler failureHandler = (failure) -> {}; +final ElasticsearchSink sink = + createMinimalBuilder().setFailureHandler(failureHandler).build(); + +final InitContext sinkInitContext = new MockInitContext(); +final BulkResponseInspector bulkResponseInspector = + sink.ge
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on code in PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1427487497 ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java: ## @@ -99,7 +119,120 @@ void testThrowIfSetInvalidTimeouts() { .isInstanceOf(IllegalStateException.class); } +@Test +void testOverrideFailureHandler() { +final FailureHandler failureHandler = (failure) -> {}; +final ElasticsearchSink sink = + createMinimalBuilder().setFailureHandler(failureHandler).build(); + +final InitContext sinkInitContext = new MockInitContext(); +final BulkResponseInspector bulkResponseInspector = + sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup); +assertThat(bulkResponseInspector) +.isInstanceOf(DefaultBulkResponseInspector.class) +.extracting( +(inspector) -> ((DefaultBulkResponseInspector) inspector).failureHandler) +.isEqualTo(failureHandler); +} + +@Test +void testOverrideBulkResponseInspectorFactory() { +final AtomicBoolean called = new AtomicBoolean(); +final BulkResponseInspectorFactory bulkResponseInspectorFactory = +initContext -> { +final MetricGroup metricGroup = initContext.metricGroup(); +metricGroup.addGroup("bulk").addGroup("result", "failed").counter("actions"); +called.set(true); +return (BulkResponseInspector) (request, response) -> {}; +}; +final ElasticsearchSink sink = +createMinimalBuilder() + .setBulkResponseInspectorFactory(bulkResponseInspectorFactory) +.build(); + +final InitContext sinkInitContext = new MockInitContext(); + +assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException(); +assertThat(called).isTrue(); +} + abstract B createEmptyBuilder(); abstract B createMinimalBuilder(); + +private static class DummyMailboxExecutor implements MailboxExecutor { +private DummyMailboxExecutor() {} + +public void execute( +ThrowingRunnable command, +String descriptionFormat, +Object... descriptionArgs) {} + +public void yield() throws InterruptedException, FlinkRuntimeException {} + +public boolean tryYield() throws FlinkRuntimeException { +return false; +} +} + +private static class MockInitContext +implements Sink.InitContext, SerializationSchema.InitializationContext { + +public UserCodeClassLoader getUserCodeClassLoader() { +return SimpleUserCodeClassLoader.create( +ElasticsearchSinkBuilderBaseTest.class.getClassLoader()); +} + +public MailboxExecutor getMailboxExecutor() { +return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor(); +} + +public ProcessingTimeService getProcessingTimeService() { +return new TestProcessingTimeService(); +} + +public int getSubtaskId() { +return 0; +} + +public int getNumberOfParallelSubtasks() { +return 0; +} + +public int getAttemptNumber() { +return 0; +} + +public SinkWriterMetricGroup metricGroup() { Review Comment: We should register IOMetricGroup also. ```java public SinkWriterMetricGroup metricGroup() { final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); return InternalSinkWriterMetricGroup.wrap( new TestingSinkWriterMetricGroup.Builder() .setParentMetricGroup(new UnregisteredMetricsGroup()) .setIoMetricGroupSupplier(() ->operatorIOMetricGroup) .build()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1855334073 @reswqa, thanks `TestingSinkWriterMetricGroup` works on all tested versions. I only faced the arch rules issue locally, so far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1855090968 @schulzp, thank you for the investigation. 1. I think `TestingSinkWriterMetricGroup` might help. 2. Yes, this is a violations indeed. But I'm not sure if this will lead to CI failure, could you link the workflow failed as this? Anyway, I agree that this would go beyond the scope of this PR and I can fix this later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1853540370 @reswqa, there are two things that need to be fixed: 1. use `MetricsGroupTestUtils#mockWriterMetricGroup()` instead of `InternalSinkWriterMetricGroup.mock()`, however, the underlying change was introduced [only recently](https://github.com/apache/flink/commit/92951a05127f1e0e2ab0ea04ae022659fc5276ab) code relying on it would not be compatible with 1.17.1 or 1.18.0 2. fix usage of non-public API which is enforced by the `ConnectorRules`, but this would go beyond the scope of this PR, since the main branch triggers those violations (only Java 11 and 17) already, see below ``` [ERROR] Failures: [ERROR] Architecture Violation [Priority: MEDIUM] - Rule 'Connector production code must depend only on public API when outside of connector packages' was violated (11 times): Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls method in (ElasticsearchSink.java:73) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls method in (ElasticsearchSink.java:71) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls method in (ElasticsearchSink.java:72) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls method in (ElasticsearchSink.java:74) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls method in (ElasticsearchSink.java:75) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls method in (ElasticsearchSink.java:76) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig)> calls method in (ElasticsearchSink.java:77) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, boolean, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig, org.apache.flink.metrics.groups.SinkWriterMetricGroup, org.apache.flink.api.common.operators.MailboxExecutor)> calls method in (ElasticsearchWriter.java:107) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, boolean, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory, org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig, org.apache.flink.metrics.groups.SinkWriterMetricGroup, org.apache.flink.api.common.operators.MailboxExecutor)> calls method in (ElasticsearchWriter.java:97) Constructor (java.util.List, org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter, boolean, org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig, org.apache.fl
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1853168166 @schulzp That PR has been merged. I have pushed an empty commit to your branch to re-trigger CI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1851249351 We should waiting for https://github.com/apache/flink/pull/23876 to be merged(This won't take long as it has already been approved by 3 votes). After that, CI should be able to pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1850111290 @reswqa, fixed the cause of the `NotSerializableException` (implicit reference of anonymous class to `ElasticsearchSinkBuilderBase`). CI should pass now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1845356753 @reswqa, I overrode the missing methods required by 1.18.0's `Sink.InitContext`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on code in PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1418511130 ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.util.FlinkRuntimeException; + +import org.assertj.core.api.Assertions; +import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class DefaultBulkResponseInspectorTest { + +@Test +void testPassWithoutFailures() { +final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); +inspector.inspect(new BulkRequest(), new BulkResponse(new BulkItemResponse[] {}, 0)); +} + +@Test +void testThrowsChainedFailure() { +final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); +Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) +.isThrownBy( +() -> { +final BulkRequest request = new BulkRequest(); +request.add( +new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + +inspector.inspect( +request, +new BulkResponse( +new BulkItemResponse[] { +new BulkItemResponse( +0, OpType.CREATE, (DocWriteResponse) null), +new BulkItemResponse( +1, +OpType.DELETE, +new Failure( +"index", +"type", +"id", +new IOException("A"))), +new BulkItemResponse( +2, +OpType.DELETE, +new Failure( +"index", +"type", +"id", +new IOException("B"))) +}, +0)); +}); +} + +@Test +void testPassesDespiteChainedFailure() { +final DefaultBulkResponseInspector inspector = +new DefaultBulkResponseInspector((failure) -> {}); +Assertions.assertThatCode( +() -> { +final BulkRequest request = new BulkRequest(); +request.add( +new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + +inspector.inspect( +request, +new Bulk
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on code in PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1418486556 ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.util.FlinkRuntimeException; + +import org.assertj.core.api.Assertions; +import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class DefaultBulkResponseInspectorTest { + +@Test +void testPassWithoutFailures() { +final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); +inspector.inspect(new BulkRequest(), new BulkResponse(new BulkItemResponse[] {}, 0)); +} + +@Test +void testThrowsChainedFailure() { +final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); +Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) +.isThrownBy( +() -> { +final BulkRequest request = new BulkRequest(); +request.add( +new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + +inspector.inspect( +request, +new BulkResponse( +new BulkItemResponse[] { +new BulkItemResponse( +0, OpType.CREATE, (DocWriteResponse) null), +new BulkItemResponse( +1, +OpType.DELETE, +new Failure( +"index", +"type", +"id", +new IOException("A"))), +new BulkItemResponse( +2, +OpType.DELETE, +new Failure( +"index", +"type", +"id", +new IOException("B"))) +}, +0)); +}); +} + +@Test +void testPassesDespiteChainedFailure() { +final DefaultBulkResponseInspector inspector = +new DefaultBulkResponseInspector((failure) -> {}); +Assertions.assertThatCode( +() -> { +final BulkRequest request = new BulkRequest(); +request.add( +new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + +inspector.inspect( +request, +new Bulk
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on code in PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#discussion_r1418211399 ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.util.FlinkRuntimeException; + +import org.assertj.core.api.Assertions; +import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class DefaultBulkResponseInspectorTest { + +@Test +void testPassWithoutFailures() { +final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); +inspector.inspect(new BulkRequest(), new BulkResponse(new BulkItemResponse[] {}, 0)); Review Comment: We'd better use `Assertions.assertThatCode(() -> inspector.inspect(new BulkRequest(), new BulkResponse(new BulkItemResponse[] {}, 0))).doesNotThrowAnyException();` here. It's more expressive. ## flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.util.FlinkRuntimeException; + +import org.assertj.core.api.Assertions; +import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class DefaultBulkResponseInspectorTest { + +@Test +void testPassWithoutFailures() { +final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); +inspector.inspect(new BulkRequest(), new BulkResponse(new BulkItemResponse[] {}, 0)); +} + +@Test +void testThrowsChainedFailure() { +final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); +Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) +.isThrownBy( +() -> { +final BulkRequest request = new BulkRequest(); +request.add( +new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + +
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843307919 @afedulov, fixed the test (and extended it) @reta, I added a `FailureHandler` with a default implementation that resembles the current fail-on-any-failure behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843137406 @afedulov, sure, I'll implement it with plain java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
afedulov commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843121034 @schulzp thanks! We try to avoid Mockito usage, unless it is not possible because because of external dependencies that required concrete classes rather than interfaces. Generally speaking, resorting to Mockito is discouraged. https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations In this case it is not necessary, you can just create mock test implementations as private static classes within the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reta commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843109798 > I assume there shared code to be reused and that is more about API consistency? There is no shared code (I think you meant that) and indeed, it is more about API consistency for both connectors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843099110 @reta, sure, so I'll rename my interfaces to match those from opensearch and add the factory to the opensearch sink builder afterwards. I assume there shared code to be reused and that is more about API consistency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reta commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843053300 > @reta, I checked out that the opensearch failure handler. That achieves the same in regard of error handling. However, it does not allow capturing additional metrics. At least for us that would not suffice. Thanks @schulzp , I think we could: - backport the change from Opensearch connector - enhance it with an ability to capture additional metrics - port this change to Opensearch and Elasticsearch connectors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843046355 @afedulov, thanks! I added a test that makes sure the inspector is passed to the `ElasticsearchWriter`. @reswqa, I looked into the pipeline failed pipelines: except for one (against flink 1.19-SNAPSHOT) all of them have been canceled. The pipeline logs do not state why. Apparently some interfaces have been moved/changed in 1.19-SNAPSHOT so it no longer compiles. @reta, I checked out that the opensearch failure handler. That achieves the same in regard of error handling. However, it does not allow capturing additional metrics. At least for us that would not suffice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reta commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1842971156 @schulzp there was a similar change introduced into `flink-connector-opensearch`, I believe we could backport it to the Elasticsearch connector to have a similar model of configuration and failure handling. [1] https://github.com/apache/flink-connector-opensearch/pull/11 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
afedulov commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1841113426 @schulzp thanks for the contribution. The approach looks solid. I believe what is missing are some tests that check that the non-default inspector is actually utilized when set and that the MetricGroup is propagated to it correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
boring-cyborg[bot] commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1835980923 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp opened a new pull request, #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83 Extracted `BulkResponseInspector` interface to allow custom handling of (partially) failed bulk requests. If not overridden, default behaviour remains unchanged and partial failures are escalated. * fixes https://issues.apache.org/jira/browse/FLINK-32028 * allows custom metrics to be exposed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org