This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
commit d226fc3e8f36870e82fe860a8e3e46f9d82e7b8c Author: Alexander Preuß <11444089+alp...@users.noreply.github.com> AuthorDate: Mon Sep 12 15:43:52 2022 +0200 [FLINK-27185] Convert connector-elasticsearch modules to assertj --- .../sink/ElasticsearchSinkBaseITCase.java | 2 +- .../sink/ElasticsearchSinkBuilderBaseTest.java | 10 +- .../sink/ElasticsearchWriterITCase.java | 4 +- .../elasticsearch/sink/TestClientBase.java | 4 +- .../ElasticsearchDynamicSinkFactoryBaseTest.java | 278 ++++++++++----------- .../elasticsearch/ElasticsearchSinkBaseTest.java | 108 ++------ .../elasticsearch/ElasticsearchSinkTestBase.java | 51 ++-- .../table/IndexGeneratorFactoryTest.java | 95 ++++--- .../testutils/SourceSinkDataTestKit.java | 5 +- .../Elasticsearch6DynamicSinkFactoryTest.java | 22 +- .../Elasticsearch7DynamicSinkFactoryTest.java | 20 +- 11 files changed, 256 insertions(+), 343 deletions(-) diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java index 65c2af3..bd574d8 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java @@ -123,7 +123,7 @@ abstract class ElasticsearchSinkBaseITCase { runTest(index, false, TestEmitter::jsonEmitter, deliveryGuarantee, null); } catch (IllegalStateException e) { failure = true; - assertThat(deliveryGuarantee).isEqualTo(DeliveryGuarantee.EXACTLY_ONCE); + assertThat(deliveryGuarantee).isSameAs(DeliveryGuarantee.EXACTLY_ONCE); } finally { assertThat(failure).isEqualTo(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE); } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java index 8e7d096..3fc2abc 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java @@ -30,7 +30,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link ElasticsearchSinkBuilderBase}. */ @@ -54,7 +54,7 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends ElasticsearchSinkBuild return DynamicTest.stream( validBuilders, ElasticsearchSinkBuilderBase::toString, - builder -> assertThatNoException().isThrownBy(builder::build)); + builder -> assertThatCode(builder::build).doesNotThrowAnyException()); } @Test @@ -91,11 +91,11 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends ElasticsearchSinkBuild @Test void testThrowIfSetInvalidTimeouts() { - assertThatThrownBy(() -> createEmptyBuilder().setConnectionRequestTimeout(-1)) + assertThatThrownBy(() -> createEmptyBuilder().setConnectionRequestTimeout(-1).build()) .isInstanceOf(IllegalStateException.class); - assertThatThrownBy(() -> createEmptyBuilder().setConnectionTimeout(-1)) + assertThatThrownBy(() -> createEmptyBuilder().setConnectionTimeout(-1).build()) .isInstanceOf(IllegalStateException.class); - assertThatThrownBy(() -> createEmptyBuilder().setSocketTimeout(-1)) + assertThatThrownBy(() -> createEmptyBuilder().setSocketTimeout(-1).build()) .isInstanceOf(IllegalStateException.class); } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java index 0f88d57..c4e5255 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java @@ -238,7 +238,7 @@ class ElasticsearchWriterITCase { writer.blockingFlushAllActions(); - assertThat(recordsSend.isPresent()).isTrue(); + assertThat(recordsSend).isPresent(); assertThat(recordsSend.get().getCount()).isEqualTo(3L); } } @@ -259,7 +259,7 @@ class ElasticsearchWriterITCase { writer.blockingFlushAllActions(); - assertThat(currentSendTime.isPresent()).isTrue(); + assertThat(currentSendTime).isPresent(); assertThat(currentSendTime.get().getValue()).isGreaterThan(0L); } } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestClientBase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestClientBase.java index 589057b..1dd70ce 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestClientBase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestClientBase.java @@ -42,8 +42,8 @@ abstract class TestClientBase { try { final GetResponse response = getResponse(index, id); assertThat(response.isExists()) - .isFalse() - .as(String.format("Id %s is unexpectedly present.", id)); + .as(String.format("Id %s is unexpectedly present.", id)) + .isFalse(); } catch (ElasticsearchStatusException e) { assertThat(e.status().getStatus()).isEqualTo(404); } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java index 1b008bb..8e5a98e 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java @@ -20,7 +20,6 @@ package org.apache.flink.connector.elasticsearch.table; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.connector.elasticsearch.ElasticsearchUtil; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; @@ -31,13 +30,21 @@ import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.types.RowKind; import org.apache.flink.util.TestLoggerExtension; -import org.assertj.core.api.ThrowableAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; import java.util.Collections; +import static org.apache.flink.table.api.DataTypes.ARRAY; +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.BYTES; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.MAP; +import static org.apache.flink.table.api.DataTypes.MULTISET; +import static org.apache.flink.table.api.DataTypes.RAW; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -50,192 +57,164 @@ abstract class ElasticsearchDynamicSinkFactoryBaseTest { abstract TestContext createPrefilledTestContext(); - void assertValidationException( - String expectedMessage, ThrowableAssert.ThrowingCallable executable) { - assertThatThrownBy(executable) - .isInstanceOf(ValidationException.class) - .hasMessage(expectedMessage); - } - @Test public void validateWrongIndex() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "'index' must not be empty", - () -> - sinkFactory.createDynamicTableSink( - createPrefilledTestContext() - .withOption( - ElasticsearchConnectorOptions.INDEX_OPTION.key(), - "") - .build())); + assertThatThrownBy( + () -> + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption( + ElasticsearchConnectorOptions.INDEX_OPTION + .key(), + "") + .build())) + .isInstanceOf(ValidationException.class) + .hasMessage("'index' must not be empty"); } @Test public void validateWrongHosts() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'.", - () -> - sinkFactory.createDynamicTableSink( - createPrefilledTestContext() - .withOption( - ElasticsearchConnectorOptions.HOSTS_OPTION.key(), - "wrong-host") - .build())); + assertThatThrownBy( + () -> + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption( + ElasticsearchConnectorOptions.HOSTS_OPTION + .key(), + "wrong-host") + .build())) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'."); } @Test public void validateWrongFlushSize() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes", - () -> - sinkFactory.createDynamicTableSink( - createPrefilledTestContext() - .withOption( - ElasticsearchConnectorOptions - .BULK_FLUSH_MAX_SIZE_OPTION - .key(), - "1kb") - .build())); + assertThatThrownBy( + () -> + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption( + ElasticsearchConnectorOptions + .BULK_FLUSH_MAX_SIZE_OPTION + .key(), + "1kb") + .build())) + .isInstanceOf(ValidationException.class) + .hasMessage( + "'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes"); } @Test public void validateWrongRetries() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "'sink.bulk-flush.backoff.max-retries' must be at least 1. Got: 0", - () -> - sinkFactory.createDynamicTableSink( - createPrefilledTestContext() - .withOption( - ElasticsearchConnectorOptions - .BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION - .key(), - "0") - .build())); + assertThatThrownBy( + () -> + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption( + ElasticsearchConnectorOptions + .BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION + .key(), + "0") + .build())) + .isInstanceOf(ValidationException.class) + .hasMessage("'sink.bulk-flush.backoff.max-retries' must be at least 1. Got: 0"); } @Test public void validateWrongMaxActions() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "'sink.bulk-flush.max-actions' must be at least 1. Got: -2", - () -> - sinkFactory.createDynamicTableSink( - createPrefilledTestContext() - .withOption( - ElasticsearchConnectorOptions - .BULK_FLUSH_MAX_ACTIONS_OPTION - .key(), - "-2") - .build())); + assertThatThrownBy( + () -> + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption( + ElasticsearchConnectorOptions + .BULK_FLUSH_MAX_ACTIONS_OPTION + .key(), + "-2") + .build())) + .isInstanceOf(ValidationException.class) + .hasMessage("'sink.bulk-flush.max-actions' must be at least 1. Got: -2"); } @Test public void validateWrongBackoffDelay() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "Invalid value for option 'sink.bulk-flush.backoff.delay'.", - () -> - sinkFactory.createDynamicTableSink( - createPrefilledTestContext() - .withOption( - ElasticsearchConnectorOptions - .BULK_FLUSH_BACKOFF_DELAY_OPTION - .key(), - "-1s") - .build())); + assertThatThrownBy( + () -> + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption( + ElasticsearchConnectorOptions + .BULK_FLUSH_BACKOFF_DELAY_OPTION + .key(), + "-1s") + .build())) + .isInstanceOf(ValidationException.class) + .hasMessage("Invalid value for option 'sink.bulk-flush.backoff.delay'."); } @Test public void validatePrimaryKeyOnIllegalColumn() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "The table has a primary key on columns of illegal types: " - + "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].", - () -> - sinkFactory.createDynamicTableSink( - createPrefilledTestContext() - .withSchema( - new ResolvedSchema( - Arrays.asList( - Column.physical( - "a", - DataTypes.BIGINT() - .notNull()), - Column.physical( - "b", - DataTypes.ARRAY( - DataTypes - .BIGINT() - .notNull()) - .notNull()), - Column.physical( - "c", - DataTypes.MAP( - DataTypes - .BIGINT(), - DataTypes - .STRING()) - .notNull()), - Column.physical( - "d", - DataTypes.MULTISET( - DataTypes - .BIGINT() - .notNull()) - .notNull()), - Column.physical( - "e", - DataTypes.ROW( - DataTypes - .FIELD( - "a", - DataTypes - .BIGINT())) - .notNull()), - Column.physical( - "f", - DataTypes.RAW( - Void.class, - VoidSerializer - .INSTANCE) - .notNull()), - Column.physical( - "g", - DataTypes.BYTES() - .notNull())), - Collections.emptyList(), - UniqueConstraint.primaryKey( - "name", - Arrays.asList( - "a", "b", "c", "d", "e", - "f", "g")))) - .build())); + ResolvedSchema resolvedSchema = + new ResolvedSchema( + Arrays.asList( + Column.physical("a", BIGINT().notNull()), + Column.physical("b", ARRAY(BIGINT().notNull()).notNull()), + Column.physical("c", MAP(BIGINT(), STRING()).notNull()), + Column.physical("d", MULTISET(BIGINT().notNull()).notNull()), + Column.physical("e", ROW(FIELD("a", BIGINT())).notNull()), + Column.physical( + "f", RAW(Void.class, VoidSerializer.INSTANCE).notNull()), + Column.physical("g", BYTES().notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey( + "name", Arrays.asList("a", "b", "c", "d", "e", "f", "g"))); + + assertThatThrownBy( + () -> + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withSchema(resolvedSchema) + .build())) + .isInstanceOf(ValidationException.class) + .hasMessage( + "The table has a primary key on columns of illegal types: " + + "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY]."); } @Test public void validateWrongCredential() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "'username' and 'password' must be set at the same time. Got: username 'username' and password ''", - () -> - sinkFactory.createDynamicTableSink( - createPrefilledTestContext() - .withOption( - ElasticsearchConnectorOptions.USERNAME_OPTION.key(), - "username") - .withOption( - ElasticsearchConnectorOptions.PASSWORD_OPTION.key(), - "") - .build())); + assertThatThrownBy( + () -> + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption( + ElasticsearchConnectorOptions + .USERNAME_OPTION + .key(), + "username") + .withOption( + ElasticsearchConnectorOptions + .PASSWORD_OPTION + .key(), + "") + .build())) + .isInstanceOf(ValidationException.class) + .hasMessage( + "'username' and 'password' must be set at the same time. Got: username 'username' and password ''"); } @Test @@ -254,9 +233,10 @@ abstract class ElasticsearchDynamicSinkFactoryBaseTest { .addContainedKind(RowKind.DELETE) .addContainedKind(RowKind.INSERT) .build(); - assertValidationException( - "Dynamic indexing based on system time only works on append only stream.", - () -> sink.getChangelogMode(changelogMode)); + assertThatThrownBy(() -> sink.getChangelogMode(changelogMode)) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Dynamic indexing based on system time only works on append only stream."); } @Test @@ -271,6 +251,6 @@ abstract class ElasticsearchDynamicSinkFactoryBaseTest { ElasticsearchDynamicSink esSink = (ElasticsearchDynamicSink) sink; SinkV2Provider provider = (SinkV2Provider) esSink.getSinkRuntimeProvider(new ElasticsearchUtil.MockContext()); - assertThat(2).isEqualTo(provider.getParallelism().get()); + assertThat(provider.getParallelism()).hasValue(2); } } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java index 98c4093..33675d1 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java @@ -36,7 +36,6 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -50,6 +49,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -104,17 +105,9 @@ public class ElasticsearchSinkBaseTest { // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); - try { - testHarness.processElement(new StreamRecord<>("next msg")); - } catch (Exception e) { - // the invoke should have failed with the failure - Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for record")); - - // test succeeded - return; - } - - Assert.fail(); + assertThatThrownBy(() -> testHarness.processElement(new StreamRecord<>("next msg"))) + .getCause() + .hasMessageContaining("artificial failure for record"); } /** @@ -143,18 +136,10 @@ public class ElasticsearchSinkBaseTest { // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); - try { - testHarness.snapshot(1L, 1000L); - } catch (Exception e) { - // the snapshot should have failed with the failure - Assert.assertTrue( - e.getCause().getCause().getMessage().contains("artificial failure for record")); - - // test succeeded - return; - } - - Assert.fail(); + assertThatThrownBy(() -> testHarness.snapshot(1L, 1000L)) + .getCause() + .getCause() + .hasMessageContaining("artificial failure for record"); } /** @@ -213,18 +198,10 @@ public class ElasticsearchSinkBaseTest { // fail) sink.continueFlush(); - try { - snapshotThread.sync(); - } catch (Exception e) { - // the snapshot should have failed with the failure from the 2nd request - Assert.assertTrue( - e.getCause().getCause().getMessage().contains("artificial failure for record")); - - // test succeeded - return; - } - - Assert.fail(); + assertThatThrownBy(snapshotThread::sync) + .getCause() + .getCause() + .hasMessageContaining("artificial failure for record"); } /** @@ -252,18 +229,9 @@ public class ElasticsearchSinkBaseTest { // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); - try { - testHarness.processElement(new StreamRecord<>("next msg")); - } catch (Exception e) { - // the invoke should have failed with the bulk request failure - Assert.assertTrue( - e.getCause().getMessage().contains("artificial failure for bulk request")); - - // test succeeded - return; - } - - Assert.fail(); + assertThatThrownBy(() -> testHarness.processElement(new StreamRecord<>("next msg"))) + .getCause() + .hasMessageContaining("artificial failure for bulk request"); } /** @@ -291,21 +259,10 @@ public class ElasticsearchSinkBaseTest { // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); - try { - testHarness.snapshot(1L, 1000L); - } catch (Exception e) { - // the snapshot should have failed with the bulk request failure - Assert.assertTrue( - e.getCause() - .getCause() - .getMessage() - .contains("artificial failure for bulk request")); - - // test succeeded - return; - } - - Assert.fail(); + assertThatThrownBy(() -> testHarness.snapshot(1L, 1000L)) + .getCause() + .getCause() + .hasMessageContaining("artificial failure for bulk request"); } /** @@ -360,21 +317,10 @@ public class ElasticsearchSinkBaseTest { // let the snapshot-triggered flush continue (bulk request should fail completely) sink.continueFlush(); - try { - snapshotThread.sync(); - } catch (Exception e) { - // the snapshot should have failed with the bulk request failure - Assert.assertTrue( - e.getCause() - .getCause() - .getMessage() - .contains("artificial failure for bulk request")); - - // test succeeded - return; - } - - Assert.fail(); + assertThatThrownBy(snapshotThread::sync) + .getCause() + .getCause() + .hasMessageContaining("artificial failure for bulk request"); } /** @@ -427,7 +373,7 @@ public class ElasticsearchSinkBaseTest { } // current number of pending request should be 1 due to the re-add - Assert.assertEquals(1, sink.getNumPendingRequests()); + assertThat(sink.getNumPendingRequests()).isEqualTo(1); // this time, let the bulk request succeed, so no-more requests are re-added sink.setMockItemFailuresListForNextBulkItemResponses( @@ -482,8 +428,8 @@ public class ElasticsearchSinkBaseTest { sink.open(mock(Configuration.class)); sink.close(); - Assert.assertTrue(sinkFunction.openCalled); - Assert.assertTrue(sinkFunction.closeCalled); + assertThat(sinkFunction.openCalled).isTrue(); + assertThat(sinkFunction.closeCalled).isTrue(); } private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> { diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java index 84342fc..5c3d4ec 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} @@ -100,33 +100,28 @@ public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> * null}. */ public void runNullAddressesTest() { - try { - createElasticsearchSink( - 1, getClusterName(), null, SourceSinkDataTestKit.getJsonSinkFunction("test")); - } catch (IllegalArgumentException | NullPointerException expectedException) { - // test passes - return; - } - - fail(); + assertThatThrownBy( + () -> + createElasticsearchSink( + 1, + getClusterName(), + null, + SourceSinkDataTestKit.getJsonSinkFunction("test"))) + .isInstanceOfAny(IllegalArgumentException.class, NullPointerException.class); } /** * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is empty. */ public void runEmptyAddressesTest() { - try { - createElasticsearchSink( - 1, - getClusterName(), - Collections.emptyList(), - SourceSinkDataTestKit.getJsonSinkFunction("test")); - } catch (IllegalArgumentException expectedException) { - // test passes - return; - } - - fail(); + assertThatThrownBy( + () -> + createElasticsearchSink( + 1, + getClusterName(), + Collections.emptyList(), + SourceSinkDataTestKit.getJsonSinkFunction("test"))) + .isInstanceOf(IllegalArgumentException.class); } /** Tests whether the Elasticsearch sink fails when there is no cluster to connect to. */ @@ -143,16 +138,8 @@ public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> SourceSinkDataTestKit.getJsonSinkFunction("test"), "123.123.123.123")); // incorrect ip address - try { - env.execute("Elasticsearch Sink Test"); - } catch (JobExecutionException expectedException) { - // every ES version throws a different exception in case of timeouts, so don't bother - // asserting on the exception - // test passes - return; - } - - fail(); + assertThatThrownBy(() -> env.execute("Elasticsearch Sink Test")) + .isInstanceOf(JobExecutionException.class); } /** Utility method to create a user config map. */ diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java index a5f7759..5941a0d 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java @@ -27,7 +27,6 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -37,11 +36,12 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; -import java.time.temporal.UnsupportedTemporalTypeException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link IndexGeneratorFactory}. */ public class IndexGeneratorFactoryTest extends TestLogger { @@ -91,12 +91,12 @@ public class IndexGeneratorFactoryTest extends TestLogger { IndexGeneratorFactory.createIndexGenerator( "{order_timestamp|yyyy_MM_dd_HH-ss}_index", schema); indexGenerator.open(); - Assert.assertEquals("2020_03_18_12-14_index", indexGenerator.generate(rows.get(0))); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("2020_03_18_12-14_index"); IndexGenerator indexGenerator1 = IndexGeneratorFactory.createIndexGenerator( "{order_timestamp|yyyy_MM_dd_HH_mm}_index", schema); indexGenerator1.open(); - Assert.assertEquals("2020_03_19_12_22_index", indexGenerator1.generate(rows.get(1))); + assertThat(indexGenerator1.generate(rows.get(1))).isEqualTo("2020_03_19_12_22_index"); } @Test @@ -105,8 +105,8 @@ public class IndexGeneratorFactoryTest extends TestLogger { IndexGeneratorFactory.createIndexGenerator( "my-index-{log_date|yyyy/MM/dd}", schema); indexGenerator.open(); - Assert.assertEquals("my-index-2020/03/18", indexGenerator.generate(rows.get(0))); - Assert.assertEquals("my-index-2020/03/19", indexGenerator.generate(rows.get(1))); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-2020/03/18"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020/03/19"); } @Test @@ -114,8 +114,8 @@ public class IndexGeneratorFactoryTest extends TestLogger { IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator("my-index-{log_time|HH-mm}", schema); indexGenerator.open(); - Assert.assertEquals("my-index-12-12", indexGenerator.generate(rows.get(0))); - Assert.assertEquals("my-index-12-22", indexGenerator.generate(rows.get(1))); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-12-12"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-12-22"); } @Test @@ -123,8 +123,8 @@ public class IndexGeneratorFactoryTest extends TestLogger { IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator("my-index-{log_time|}", schema); indexGenerator.open(); - Assert.assertEquals("my-index-12_12_14", indexGenerator.generate(rows.get(0))); - Assert.assertEquals("my-index-12_22_21", indexGenerator.generate(rows.get(1))); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-12_12_14"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-12_22_21"); } @Test @@ -158,9 +158,10 @@ public class IndexGeneratorFactoryTest extends TestLogger { String actualIndex = indexGenerator.generate(rows.get(1)); String expectedIndex2 = "my-index-" + LocalDateTime.now().format(dateTimeFormatter); - Assert.assertTrue( - actualIndex.equals(expectedIndex1) - || actualIndex.equals(expectedIndex2)); + assertThat( + actualIndex.equals(expectedIndex1) + || actualIndex.equals(expectedIndex2)) + .isTrue(); }); List<String> invalidUseCases = @@ -187,7 +188,7 @@ public class IndexGeneratorFactoryTest extends TestLogger { schema); indexGenerator.open(); } catch (TableException e) { - Assert.assertEquals(expectedExceptionMsg, e.getMessage()); + assertThat(e).hasMessage(expectedExceptionMsg); } }); } @@ -197,8 +198,8 @@ public class IndexGeneratorFactoryTest extends TestLogger { IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator("my-index-{local_timestamp|}", schema); indexGenerator.open(); - Assert.assertEquals("my-index-2020_03_18_12_12_14Z", indexGenerator.generate(rows.get(0))); - Assert.assertEquals("my-index-2020_03_19_12_12_14Z", indexGenerator.generate(rows.get(1))); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-2020_03_17_19_12_14Z"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020_03_20_03_22_14Z"); } @Test @@ -206,8 +207,8 @@ public class IndexGeneratorFactoryTest extends TestLogger { IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator("index_{item}", schema); indexGenerator.open(); - Assert.assertEquals("index_apple", indexGenerator.generate(rows.get(0))); - Assert.assertEquals("index_peanut", indexGenerator.generate(rows.get(1))); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_apple"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_peanut"); } @Test @@ -215,8 +216,8 @@ public class IndexGeneratorFactoryTest extends TestLogger { IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator("my-index", schema); indexGenerator.open(); - Assert.assertEquals("my-index", indexGenerator.generate(rows.get(0))); - Assert.assertEquals("my-index", indexGenerator.generate(rows.get(1))); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index"); } @Test @@ -224,11 +225,12 @@ public class IndexGeneratorFactoryTest extends TestLogger { String expectedExceptionMsg = "Unknown field 'unknown_ts' in index pattern 'my-index-{unknown_ts|yyyy-MM-dd}'," + " please check the field name."; - try { - IndexGeneratorFactory.createIndexGenerator("my-index-{unknown_ts|yyyy-MM-dd}", schema); - } catch (TableException e) { - Assert.assertEquals(e.getMessage(), expectedExceptionMsg); - } + assertThatThrownBy( + () -> + IndexGeneratorFactory.createIndexGenerator( + "my-index-{unknown_ts|yyyy-MM-dd}", schema)) + .isInstanceOf(TableException.class) + .hasMessage(expectedExceptionMsg); } @Test @@ -236,11 +238,12 @@ public class IndexGeneratorFactoryTest extends TestLogger { String expectedExceptionMsg = "Unsupported type 'INT' found in Elasticsearch dynamic index field, " + "time-related pattern only support types are: DATE,TIME,TIMESTAMP."; - try { - IndexGeneratorFactory.createIndexGenerator("my-index-{id|yyyy-MM-dd}", schema); - } catch (TableException e) { - Assert.assertEquals(expectedExceptionMsg, e.getMessage()); - } + assertThatThrownBy( + () -> + IndexGeneratorFactory.createIndexGenerator( + "my-index-{id|yyyy-MM-dd}", schema)) + .isInstanceOf(TableException.class) + .hasMessage(expectedExceptionMsg); } @Test @@ -248,23 +251,12 @@ public class IndexGeneratorFactoryTest extends TestLogger { String expectedExceptionMsg = "Chaining dynamic index pattern my-index-{local_date}-{local_time} is not supported," + " only support single dynamic index pattern."; - try { - IndexGeneratorFactory.createIndexGenerator( - "my-index-{local_date}-{local_time}", schema); - } catch (TableException e) { - Assert.assertEquals(expectedExceptionMsg, e.getMessage()); - } - } - - @Test - public void testDynamicIndexUnsupportedFormat() { - String expectedExceptionMsg = "Unsupported field: HourOfDay"; - try { - IndexGeneratorFactory.createIndexGenerator( - "my-index-{log_date|yyyy/MM/dd HH:mm}", schema); - } catch (UnsupportedTemporalTypeException e) { - Assert.assertEquals(expectedExceptionMsg, e.getMessage()); - } + assertThatThrownBy( + () -> + IndexGeneratorFactory.createIndexGenerator( + "my-index-{local_date}-{local_time}", schema)) + .isInstanceOf(TableException.class) + .hasMessage(expectedExceptionMsg); } @Test @@ -273,10 +265,9 @@ public class IndexGeneratorFactoryTest extends TestLogger { "Unsupported type BOOLEAN of index field, Supported types are:" + " [DATE, TIME_WITHOUT_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE," + " TIMESTAMP_WITH_LOCAL_TIME_ZONE, VARCHAR, CHAR, TINYINT, INTEGER, BIGINT]"; - try { - IndexGeneratorFactory.createIndexGenerator("index_{status}", schema); - } catch (IllegalArgumentException e) { - Assert.assertEquals(expectedExceptionMsg, e.getMessage()); - } + assertThatThrownBy( + () -> IndexGeneratorFactory.createIndexGenerator("index_{status}", schema)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(expectedExceptionMsg); } } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java index 584ed4d..33023a7 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java @@ -30,13 +30,14 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.junit.Assert; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import static org.assertj.core.api.Assertions.assertThat; + /** * This class contains utilities and a pre-defined source function and Elasticsearch Sink function * used to simulate and verify data used in tests. @@ -144,7 +145,7 @@ public class SourceSinkDataTestKit { client.get( new GetRequest(index, TYPE_NAME, Integer.toString(i)), RequestOptions.DEFAULT); - Assert.assertEquals(DATA_PREFIX + i, response.getSource().get(DATA_FIELD_NAME)); + assertThat(response.getSource().get(DATA_FIELD_NAME)).isEqualTo(DATA_PREFIX + i); } } diff --git a/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java b/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java index d8053d4..502ad72 100644 --- a/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java +++ b/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java @@ -18,9 +18,12 @@ package org.apache.flink.connector.elasticsearch.table; +import org.apache.flink.table.api.ValidationException; + import org.junit.jupiter.api.Test; import static org.apache.flink.connector.elasticsearch.table.TestContext.context; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for validation in {@link Elasticsearch6DynamicSinkFactory}. */ public class Elasticsearch6DynamicSinkFactoryTest extends ElasticsearchDynamicSinkFactoryBaseTest { @@ -42,14 +45,15 @@ public class Elasticsearch6DynamicSinkFactoryTest extends ElasticsearchDynamicSi public void validateEmptyConfiguration() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "One or more required options are missing.\n" - + "\n" - + "Missing required options are:\n" - + "\n" - + "document-type\n" - + "hosts\n" - + "index", - () -> sinkFactory.createDynamicTableSink(context().build())); + assertThatThrownBy(() -> sinkFactory.createDynamicTableSink(context().build())) + .isInstanceOf(ValidationException.class) + .hasMessage( + "One or more required options are missing.\n" + + "\n" + + "Missing required options are:\n" + + "\n" + + "document-type\n" + + "hosts\n" + + "index"); } } diff --git a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java index e824772..26c0df1 100644 --- a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java +++ b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java @@ -18,9 +18,12 @@ package org.apache.flink.connector.elasticsearch.table; +import org.apache.flink.table.api.ValidationException; + import org.junit.jupiter.api.Test; import static org.apache.flink.connector.elasticsearch.table.TestContext.context; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for validation in {@link Elasticsearch7DynamicSinkFactory}. */ public class Elasticsearch7DynamicSinkFactoryTest extends ElasticsearchDynamicSinkFactoryBaseTest { @@ -41,13 +44,14 @@ public class Elasticsearch7DynamicSinkFactoryTest extends ElasticsearchDynamicSi public void validateEmptyConfiguration() { ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); - assertValidationException( - "One or more required options are missing.\n" - + "\n" - + "Missing required options are:\n" - + "\n" - + "hosts\n" - + "index", - () -> sinkFactory.createDynamicTableSink(context().build())); + assertThatThrownBy(() -> sinkFactory.createDynamicTableSink(context().build())) + .isInstanceOf(ValidationException.class) + .hasMessage( + "One or more required options are missing.\n" + + "\n" + + "Missing required options are:\n" + + "\n" + + "hosts\n" + + "index"); } }