This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit d226fc3e8f36870e82fe860a8e3e46f9d82e7b8c
Author: Alexander Preuß <11444089+alp...@users.noreply.github.com>
AuthorDate: Mon Sep 12 15:43:52 2022 +0200

    [FLINK-27185] Convert connector-elasticsearch modules to assertj
---
 .../sink/ElasticsearchSinkBaseITCase.java          |   2 +-
 .../sink/ElasticsearchSinkBuilderBaseTest.java     |  10 +-
 .../sink/ElasticsearchWriterITCase.java            |   4 +-
 .../elasticsearch/sink/TestClientBase.java         |   4 +-
 .../ElasticsearchDynamicSinkFactoryBaseTest.java   | 278 ++++++++++-----------
 .../elasticsearch/ElasticsearchSinkBaseTest.java   | 108 ++------
 .../elasticsearch/ElasticsearchSinkTestBase.java   |  51 ++--
 .../table/IndexGeneratorFactoryTest.java           |  95 ++++---
 .../testutils/SourceSinkDataTestKit.java           |   5 +-
 .../Elasticsearch6DynamicSinkFactoryTest.java      |  22 +-
 .../Elasticsearch7DynamicSinkFactoryTest.java      |  20 +-
 11 files changed, 256 insertions(+), 343 deletions(-)

diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
index 65c2af3..bd574d8 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
@@ -123,7 +123,7 @@ abstract class ElasticsearchSinkBaseITCase {
             runTest(index, false, TestEmitter::jsonEmitter, deliveryGuarantee, 
null);
         } catch (IllegalStateException e) {
             failure = true;
-            
assertThat(deliveryGuarantee).isEqualTo(DeliveryGuarantee.EXACTLY_ONCE);
+            
assertThat(deliveryGuarantee).isSameAs(DeliveryGuarantee.EXACTLY_ONCE);
         } finally {
             assertThat(failure).isEqualTo(deliveryGuarantee == 
DeliveryGuarantee.EXACTLY_ONCE);
         }
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
index 8e7d096..3fc2abc 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
@@ -30,7 +30,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link ElasticsearchSinkBuilderBase}. */
@@ -54,7 +54,7 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends 
ElasticsearchSinkBuild
         return DynamicTest.stream(
                 validBuilders,
                 ElasticsearchSinkBuilderBase::toString,
-                builder -> assertThatNoException().isThrownBy(builder::build));
+                builder -> 
assertThatCode(builder::build).doesNotThrowAnyException());
     }
 
     @Test
@@ -91,11 +91,11 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends 
ElasticsearchSinkBuild
 
     @Test
     void testThrowIfSetInvalidTimeouts() {
-        assertThatThrownBy(() -> 
createEmptyBuilder().setConnectionRequestTimeout(-1))
+        assertThatThrownBy(() -> 
createEmptyBuilder().setConnectionRequestTimeout(-1).build())
                 .isInstanceOf(IllegalStateException.class);
-        assertThatThrownBy(() -> createEmptyBuilder().setConnectionTimeout(-1))
+        assertThatThrownBy(() -> 
createEmptyBuilder().setConnectionTimeout(-1).build())
                 .isInstanceOf(IllegalStateException.class);
-        assertThatThrownBy(() -> createEmptyBuilder().setSocketTimeout(-1))
+        assertThatThrownBy(() -> 
createEmptyBuilder().setSocketTimeout(-1).build())
                 .isInstanceOf(IllegalStateException.class);
     }
 
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
index 0f88d57..c4e5255 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
@@ -238,7 +238,7 @@ class ElasticsearchWriterITCase {
 
             writer.blockingFlushAllActions();
 
-            assertThat(recordsSend.isPresent()).isTrue();
+            assertThat(recordsSend).isPresent();
             assertThat(recordsSend.get().getCount()).isEqualTo(3L);
         }
     }
@@ -259,7 +259,7 @@ class ElasticsearchWriterITCase {
 
             writer.blockingFlushAllActions();
 
-            assertThat(currentSendTime.isPresent()).isTrue();
+            assertThat(currentSendTime).isPresent();
             assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);
         }
     }
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestClientBase.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestClientBase.java
index 589057b..1dd70ce 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestClientBase.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestClientBase.java
@@ -42,8 +42,8 @@ abstract class TestClientBase {
             try {
                 final GetResponse response = getResponse(index, id);
                 assertThat(response.isExists())
-                        .isFalse()
-                        .as(String.format("Id %s is unexpectedly present.", 
id));
+                        .as(String.format("Id %s is unexpectedly present.", 
id))
+                        .isFalse();
             } catch (ElasticsearchStatusException e) {
                 assertThat(e.status().getStatus()).isEqualTo(404);
             }
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java
index 1b008bb..8e5a98e 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.elasticsearch.table;
 
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -31,13 +30,21 @@ import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.TestLoggerExtension;
 
-import org.assertj.core.api.ThrowableAssert;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.Collections;
 
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.RAW;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -50,192 +57,164 @@ abstract class ElasticsearchDynamicSinkFactoryBaseTest {
 
     abstract TestContext createPrefilledTestContext();
 
-    void assertValidationException(
-            String expectedMessage, ThrowableAssert.ThrowingCallable 
executable) {
-        assertThatThrownBy(executable)
-                .isInstanceOf(ValidationException.class)
-                .hasMessage(expectedMessage);
-    }
-
     @Test
     public void validateWrongIndex() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
-        assertValidationException(
-                "'index' must not be empty",
-                () ->
-                        sinkFactory.createDynamicTableSink(
-                                createPrefilledTestContext()
-                                        .withOption(
-                                                
ElasticsearchConnectorOptions.INDEX_OPTION.key(),
-                                                "")
-                                        .build()));
+        assertThatThrownBy(
+                        () ->
+                                sinkFactory.createDynamicTableSink(
+                                        createPrefilledTestContext()
+                                                .withOption(
+                                                        
ElasticsearchConnectorOptions.INDEX_OPTION
+                                                                .key(),
+                                                        "")
+                                                .build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("'index' must not be empty");
     }
 
     @Test
     public void validateWrongHosts() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
-        assertValidationException(
-                "Could not parse host 'wrong-host' in option 'hosts'. It 
should follow the format 'http://host_name:port'.",
-                () ->
-                        sinkFactory.createDynamicTableSink(
-                                createPrefilledTestContext()
-                                        .withOption(
-                                                
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
-                                                "wrong-host")
-                                        .build()));
+        assertThatThrownBy(
+                        () ->
+                                sinkFactory.createDynamicTableSink(
+                                        createPrefilledTestContext()
+                                                .withOption(
+                                                        
ElasticsearchConnectorOptions.HOSTS_OPTION
+                                                                .key(),
+                                                        "wrong-host")
+                                                .build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "Could not parse host 'wrong-host' in option 'hosts'. 
It should follow the format 'http://host_name:port'.");
     }
 
     @Test
     public void validateWrongFlushSize() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
-        assertValidationException(
-                "'sink.bulk-flush.max-size' must be in MB granularity. Got: 
1024 bytes",
-                () ->
-                        sinkFactory.createDynamicTableSink(
-                                createPrefilledTestContext()
-                                        .withOption(
-                                                ElasticsearchConnectorOptions
-                                                        
.BULK_FLUSH_MAX_SIZE_OPTION
-                                                        .key(),
-                                                "1kb")
-                                        .build()));
+        assertThatThrownBy(
+                        () ->
+                                sinkFactory.createDynamicTableSink(
+                                        createPrefilledTestContext()
+                                                .withOption(
+                                                        
ElasticsearchConnectorOptions
+                                                                
.BULK_FLUSH_MAX_SIZE_OPTION
+                                                                .key(),
+                                                        "1kb")
+                                                .build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "'sink.bulk-flush.max-size' must be in MB granularity. 
Got: 1024 bytes");
     }
 
     @Test
     public void validateWrongRetries() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
 
-        assertValidationException(
-                "'sink.bulk-flush.backoff.max-retries' must be at least 1. 
Got: 0",
-                () ->
-                        sinkFactory.createDynamicTableSink(
-                                createPrefilledTestContext()
-                                        .withOption(
-                                                ElasticsearchConnectorOptions
-                                                        
.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION
-                                                        .key(),
-                                                "0")
-                                        .build()));
+        assertThatThrownBy(
+                        () ->
+                                sinkFactory.createDynamicTableSink(
+                                        createPrefilledTestContext()
+                                                .withOption(
+                                                        
ElasticsearchConnectorOptions
+                                                                
.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION
+                                                                .key(),
+                                                        "0")
+                                                .build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("'sink.bulk-flush.backoff.max-retries' must be at 
least 1. Got: 0");
     }
 
     @Test
     public void validateWrongMaxActions() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
 
-        assertValidationException(
-                "'sink.bulk-flush.max-actions' must be at least 1. Got: -2",
-                () ->
-                        sinkFactory.createDynamicTableSink(
-                                createPrefilledTestContext()
-                                        .withOption(
-                                                ElasticsearchConnectorOptions
-                                                        
.BULK_FLUSH_MAX_ACTIONS_OPTION
-                                                        .key(),
-                                                "-2")
-                                        .build()));
+        assertThatThrownBy(
+                        () ->
+                                sinkFactory.createDynamicTableSink(
+                                        createPrefilledTestContext()
+                                                .withOption(
+                                                        
ElasticsearchConnectorOptions
+                                                                
.BULK_FLUSH_MAX_ACTIONS_OPTION
+                                                                .key(),
+                                                        "-2")
+                                                .build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("'sink.bulk-flush.max-actions' must be at least 1. 
Got: -2");
     }
 
     @Test
     public void validateWrongBackoffDelay() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
 
-        assertValidationException(
-                "Invalid value for option 'sink.bulk-flush.backoff.delay'.",
-                () ->
-                        sinkFactory.createDynamicTableSink(
-                                createPrefilledTestContext()
-                                        .withOption(
-                                                ElasticsearchConnectorOptions
-                                                        
.BULK_FLUSH_BACKOFF_DELAY_OPTION
-                                                        .key(),
-                                                "-1s")
-                                        .build()));
+        assertThatThrownBy(
+                        () ->
+                                sinkFactory.createDynamicTableSink(
+                                        createPrefilledTestContext()
+                                                .withOption(
+                                                        
ElasticsearchConnectorOptions
+                                                                
.BULK_FLUSH_BACKOFF_DELAY_OPTION
+                                                                .key(),
+                                                        "-1s")
+                                                .build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("Invalid value for option 
'sink.bulk-flush.backoff.delay'.");
     }
 
     @Test
     public void validatePrimaryKeyOnIllegalColumn() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
 
-        assertValidationException(
-                "The table has a primary key on columns of illegal types: "
-                        + "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].",
-                () ->
-                        sinkFactory.createDynamicTableSink(
-                                createPrefilledTestContext()
-                                        .withSchema(
-                                                new ResolvedSchema(
-                                                        Arrays.asList(
-                                                                
Column.physical(
-                                                                        "a",
-                                                                        
DataTypes.BIGINT()
-                                                                               
 .notNull()),
-                                                                
Column.physical(
-                                                                        "b",
-                                                                        
DataTypes.ARRAY(
-                                                                               
         DataTypes
-                                                                               
                 .BIGINT()
-                                                                               
                 .notNull())
-                                                                               
 .notNull()),
-                                                                
Column.physical(
-                                                                        "c",
-                                                                        
DataTypes.MAP(
-                                                                               
         DataTypes
-                                                                               
                 .BIGINT(),
-                                                                               
         DataTypes
-                                                                               
                 .STRING())
-                                                                               
 .notNull()),
-                                                                
Column.physical(
-                                                                        "d",
-                                                                        
DataTypes.MULTISET(
-                                                                               
         DataTypes
-                                                                               
                 .BIGINT()
-                                                                               
                 .notNull())
-                                                                               
 .notNull()),
-                                                                
Column.physical(
-                                                                        "e",
-                                                                        
DataTypes.ROW(
-                                                                               
         DataTypes
-                                                                               
                 .FIELD(
-                                                                               
                         "a",
-                                                                               
                         DataTypes
-                                                                               
                                 .BIGINT()))
-                                                                               
 .notNull()),
-                                                                
Column.physical(
-                                                                        "f",
-                                                                        
DataTypes.RAW(
-                                                                               
         Void.class,
-                                                                               
         VoidSerializer
-                                                                               
                 .INSTANCE)
-                                                                               
 .notNull()),
-                                                                
Column.physical(
-                                                                        "g",
-                                                                        
DataTypes.BYTES()
-                                                                               
 .notNull())),
-                                                        
Collections.emptyList(),
-                                                        
UniqueConstraint.primaryKey(
-                                                                "name",
-                                                                Arrays.asList(
-                                                                        "a", 
"b", "c", "d", "e",
-                                                                        "f", 
"g"))))
-                                        .build()));
+        ResolvedSchema resolvedSchema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", BIGINT().notNull()),
+                                Column.physical("b", 
ARRAY(BIGINT().notNull()).notNull()),
+                                Column.physical("c", MAP(BIGINT(), 
STRING()).notNull()),
+                                Column.physical("d", 
MULTISET(BIGINT().notNull()).notNull()),
+                                Column.physical("e", ROW(FIELD("a", 
BIGINT())).notNull()),
+                                Column.physical(
+                                        "f", RAW(Void.class, 
VoidSerializer.INSTANCE).notNull()),
+                                Column.physical("g", BYTES().notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey(
+                                "name", Arrays.asList("a", "b", "c", "d", "e", 
"f", "g")));
+
+        assertThatThrownBy(
+                        () ->
+                                sinkFactory.createDynamicTableSink(
+                                        createPrefilledTestContext()
+                                                .withSchema(resolvedSchema)
+                                                .build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "The table has a primary key on columns of illegal 
types: "
+                                + "[ARRAY, MAP, MULTISET, ROW, RAW, 
VARBINARY].");
     }
 
     @Test
     public void validateWrongCredential() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
 
-        assertValidationException(
-                "'username' and 'password' must be set at the same time. Got: 
username 'username' and password ''",
-                () ->
-                        sinkFactory.createDynamicTableSink(
-                                createPrefilledTestContext()
-                                        .withOption(
-                                                
ElasticsearchConnectorOptions.USERNAME_OPTION.key(),
-                                                "username")
-                                        .withOption(
-                                                
ElasticsearchConnectorOptions.PASSWORD_OPTION.key(),
-                                                "")
-                                        .build()));
+        assertThatThrownBy(
+                        () ->
+                                sinkFactory.createDynamicTableSink(
+                                        createPrefilledTestContext()
+                                                .withOption(
+                                                        
ElasticsearchConnectorOptions
+                                                                
.USERNAME_OPTION
+                                                                .key(),
+                                                        "username")
+                                                .withOption(
+                                                        
ElasticsearchConnectorOptions
+                                                                
.PASSWORD_OPTION
+                                                                .key(),
+                                                        "")
+                                                .build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "'username' and 'password' must be set at the same 
time. Got: username 'username' and password ''");
     }
 
     @Test
@@ -254,9 +233,10 @@ abstract class ElasticsearchDynamicSinkFactoryBaseTest {
                         .addContainedKind(RowKind.DELETE)
                         .addContainedKind(RowKind.INSERT)
                         .build();
-        assertValidationException(
-                "Dynamic indexing based on system time only works on append 
only stream.",
-                () -> sink.getChangelogMode(changelogMode));
+        assertThatThrownBy(() -> sink.getChangelogMode(changelogMode))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "Dynamic indexing based on system time only works on 
append only stream.");
     }
 
     @Test
@@ -271,6 +251,6 @@ abstract class ElasticsearchDynamicSinkFactoryBaseTest {
         ElasticsearchDynamicSink esSink = (ElasticsearchDynamicSink) sink;
         SinkV2Provider provider =
                 (SinkV2Provider) esSink.getSinkRuntimeProvider(new 
ElasticsearchUtil.MockContext());
-        assertThat(2).isEqualTo(provider.getParallelism().get());
+        assertThat(provider.getParallelism()).hasValue(2);
     }
 }
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 98c4093..33675d1 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -36,7 +36,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
-import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -50,6 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -104,17 +105,9 @@ public class ElasticsearchSinkBaseTest {
         // manually execute the next bulk request
         sink.manualBulkRequestWithAllPendingRequests();
 
-        try {
-            testHarness.processElement(new StreamRecord<>("next msg"));
-        } catch (Exception e) {
-            // the invoke should have failed with the failure
-            Assert.assertTrue(e.getCause().getMessage().contains("artificial 
failure for record"));
-
-            // test succeeded
-            return;
-        }
-
-        Assert.fail();
+        assertThatThrownBy(() -> testHarness.processElement(new 
StreamRecord<>("next msg")))
+                .getCause()
+                .hasMessageContaining("artificial failure for record");
     }
 
     /**
@@ -143,18 +136,10 @@ public class ElasticsearchSinkBaseTest {
         // manually execute the next bulk request
         sink.manualBulkRequestWithAllPendingRequests();
 
-        try {
-            testHarness.snapshot(1L, 1000L);
-        } catch (Exception e) {
-            // the snapshot should have failed with the failure
-            Assert.assertTrue(
-                    e.getCause().getCause().getMessage().contains("artificial 
failure for record"));
-
-            // test succeeded
-            return;
-        }
-
-        Assert.fail();
+        assertThatThrownBy(() -> testHarness.snapshot(1L, 1000L))
+                .getCause()
+                .getCause()
+                .hasMessageContaining("artificial failure for record");
     }
 
     /**
@@ -213,18 +198,10 @@ public class ElasticsearchSinkBaseTest {
         // fail)
         sink.continueFlush();
 
-        try {
-            snapshotThread.sync();
-        } catch (Exception e) {
-            // the snapshot should have failed with the failure from the 2nd 
request
-            Assert.assertTrue(
-                    e.getCause().getCause().getMessage().contains("artificial 
failure for record"));
-
-            // test succeeded
-            return;
-        }
-
-        Assert.fail();
+        assertThatThrownBy(snapshotThread::sync)
+                .getCause()
+                .getCause()
+                .hasMessageContaining("artificial failure for record");
     }
 
     /**
@@ -252,18 +229,9 @@ public class ElasticsearchSinkBaseTest {
         // manually execute the next bulk request
         sink.manualBulkRequestWithAllPendingRequests();
 
-        try {
-            testHarness.processElement(new StreamRecord<>("next msg"));
-        } catch (Exception e) {
-            // the invoke should have failed with the bulk request failure
-            Assert.assertTrue(
-                    e.getCause().getMessage().contains("artificial failure for 
bulk request"));
-
-            // test succeeded
-            return;
-        }
-
-        Assert.fail();
+        assertThatThrownBy(() -> testHarness.processElement(new 
StreamRecord<>("next msg")))
+                .getCause()
+                .hasMessageContaining("artificial failure for bulk request");
     }
 
     /**
@@ -291,21 +259,10 @@ public class ElasticsearchSinkBaseTest {
         // manually execute the next bulk request
         sink.manualBulkRequestWithAllPendingRequests();
 
-        try {
-            testHarness.snapshot(1L, 1000L);
-        } catch (Exception e) {
-            // the snapshot should have failed with the bulk request failure
-            Assert.assertTrue(
-                    e.getCause()
-                            .getCause()
-                            .getMessage()
-                            .contains("artificial failure for bulk request"));
-
-            // test succeeded
-            return;
-        }
-
-        Assert.fail();
+        assertThatThrownBy(() -> testHarness.snapshot(1L, 1000L))
+                .getCause()
+                .getCause()
+                .hasMessageContaining("artificial failure for bulk request");
     }
 
     /**
@@ -360,21 +317,10 @@ public class ElasticsearchSinkBaseTest {
         // let the snapshot-triggered flush continue (bulk request should fail 
completely)
         sink.continueFlush();
 
-        try {
-            snapshotThread.sync();
-        } catch (Exception e) {
-            // the snapshot should have failed with the bulk request failure
-            Assert.assertTrue(
-                    e.getCause()
-                            .getCause()
-                            .getMessage()
-                            .contains("artificial failure for bulk request"));
-
-            // test succeeded
-            return;
-        }
-
-        Assert.fail();
+        assertThatThrownBy(snapshotThread::sync)
+                .getCause()
+                .getCause()
+                .hasMessageContaining("artificial failure for bulk request");
     }
 
     /**
@@ -427,7 +373,7 @@ public class ElasticsearchSinkBaseTest {
         }
 
         // current number of pending request should be 1 due to the re-add
-        Assert.assertEquals(1, sink.getNumPendingRequests());
+        assertThat(sink.getNumPendingRequests()).isEqualTo(1);
 
         // this time, let the bulk request succeed, so no-more requests are 
re-added
         sink.setMockItemFailuresListForNextBulkItemResponses(
@@ -482,8 +428,8 @@ public class ElasticsearchSinkBaseTest {
         sink.open(mock(Configuration.class));
         sink.close();
 
-        Assert.assertTrue(sinkFunction.openCalled);
-        Assert.assertTrue(sinkFunction.closeCalled);
+        assertThat(sinkFunction.openCalled).isTrue();
+        assertThat(sinkFunction.closeCalled).isTrue();
     }
 
     private static class DummyElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, Client> {
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index 84342fc..5c3d4ec 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Environment preparation and suite of tests for version-specific {@link 
ElasticsearchSinkBase}
@@ -100,33 +100,28 @@ public abstract class ElasticsearchSinkTestBase<C extends 
AutoCloseable, A>
      * null}.
      */
     public void runNullAddressesTest() {
-        try {
-            createElasticsearchSink(
-                    1, getClusterName(), null, 
SourceSinkDataTestKit.getJsonSinkFunction("test"));
-        } catch (IllegalArgumentException | NullPointerException 
expectedException) {
-            // test passes
-            return;
-        }
-
-        fail();
+        assertThatThrownBy(
+                        () ->
+                                createElasticsearchSink(
+                                        1,
+                                        getClusterName(),
+                                        null,
+                                        
SourceSinkDataTestKit.getJsonSinkFunction("test")))
+                .isInstanceOfAny(IllegalArgumentException.class, 
NullPointerException.class);
     }
 
     /**
      * Tests that the Elasticsearch sink fails eagerly if the provided list of 
addresses is empty.
      */
     public void runEmptyAddressesTest() {
-        try {
-            createElasticsearchSink(
-                    1,
-                    getClusterName(),
-                    Collections.emptyList(),
-                    SourceSinkDataTestKit.getJsonSinkFunction("test"));
-        } catch (IllegalArgumentException expectedException) {
-            // test passes
-            return;
-        }
-
-        fail();
+        assertThatThrownBy(
+                        () ->
+                                createElasticsearchSink(
+                                        1,
+                                        getClusterName(),
+                                        Collections.emptyList(),
+                                        
SourceSinkDataTestKit.getJsonSinkFunction("test")))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     /** Tests whether the Elasticsearch sink fails when there is no cluster to 
connect to. */
@@ -143,16 +138,8 @@ public abstract class ElasticsearchSinkTestBase<C extends 
AutoCloseable, A>
                         SourceSinkDataTestKit.getJsonSinkFunction("test"),
                         "123.123.123.123")); // incorrect ip address
 
-        try {
-            env.execute("Elasticsearch Sink Test");
-        } catch (JobExecutionException expectedException) {
-            // every ES version throws a different exception in case of 
timeouts, so don't bother
-            // asserting on the exception
-            // test passes
-            return;
-        }
-
-        fail();
+        assertThatThrownBy(() -> env.execute("Elasticsearch Sink Test"))
+                .isInstanceOf(JobExecutionException.class);
     }
 
     /** Utility method to create a user config map. */
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
index a5f7759..5941a0d 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,11 +36,12 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.format.DateTimeFormatter;
-import java.time.temporal.UnsupportedTemporalTypeException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Tests for {@link IndexGeneratorFactory}. */
 public class IndexGeneratorFactoryTest extends TestLogger {
 
@@ -91,12 +91,12 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                 IndexGeneratorFactory.createIndexGenerator(
                         "{order_timestamp|yyyy_MM_dd_HH-ss}_index", schema);
         indexGenerator.open();
-        Assert.assertEquals("2020_03_18_12-14_index", 
indexGenerator.generate(rows.get(0)));
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("2020_03_18_12-14_index");
         IndexGenerator indexGenerator1 =
                 IndexGeneratorFactory.createIndexGenerator(
                         "{order_timestamp|yyyy_MM_dd_HH_mm}_index", schema);
         indexGenerator1.open();
-        Assert.assertEquals("2020_03_19_12_22_index", 
indexGenerator1.generate(rows.get(1)));
+        
assertThat(indexGenerator1.generate(rows.get(1))).isEqualTo("2020_03_19_12_22_index");
     }
 
     @Test
@@ -105,8 +105,8 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                 IndexGeneratorFactory.createIndexGenerator(
                         "my-index-{log_date|yyyy/MM/dd}", schema);
         indexGenerator.open();
-        Assert.assertEquals("my-index-2020/03/18", 
indexGenerator.generate(rows.get(0)));
-        Assert.assertEquals("my-index-2020/03/19", 
indexGenerator.generate(rows.get(1)));
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-2020/03/18");
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020/03/19");
     }
 
     @Test
@@ -114,8 +114,8 @@ public class IndexGeneratorFactoryTest extends TestLogger {
         IndexGenerator indexGenerator =
                 
IndexGeneratorFactory.createIndexGenerator("my-index-{log_time|HH-mm}", schema);
         indexGenerator.open();
-        Assert.assertEquals("my-index-12-12", 
indexGenerator.generate(rows.get(0)));
-        Assert.assertEquals("my-index-12-22", 
indexGenerator.generate(rows.get(1)));
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-12-12");
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-12-22");
     }
 
     @Test
@@ -123,8 +123,8 @@ public class IndexGeneratorFactoryTest extends TestLogger {
         IndexGenerator indexGenerator =
                 
IndexGeneratorFactory.createIndexGenerator("my-index-{log_time|}", schema);
         indexGenerator.open();
-        Assert.assertEquals("my-index-12_12_14", 
indexGenerator.generate(rows.get(0)));
-        Assert.assertEquals("my-index-12_22_21", 
indexGenerator.generate(rows.get(1)));
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-12_12_14");
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-12_22_21");
     }
 
     @Test
@@ -158,9 +158,10 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                             String actualIndex = 
indexGenerator.generate(rows.get(1));
                             String expectedIndex2 =
                                     "my-index-" + 
LocalDateTime.now().format(dateTimeFormatter);
-                            Assert.assertTrue(
-                                    actualIndex.equals(expectedIndex1)
-                                            || 
actualIndex.equals(expectedIndex2));
+                            assertThat(
+                                            actualIndex.equals(expectedIndex1)
+                                                    || 
actualIndex.equals(expectedIndex2))
+                                    .isTrue();
                         });
 
         List<String> invalidUseCases =
@@ -187,7 +188,7 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                                                 schema);
                                 indexGenerator.open();
                             } catch (TableException e) {
-                                Assert.assertEquals(expectedExceptionMsg, 
e.getMessage());
+                                assertThat(e).hasMessage(expectedExceptionMsg);
                             }
                         });
     }
@@ -197,8 +198,8 @@ public class IndexGeneratorFactoryTest extends TestLogger {
         IndexGenerator indexGenerator =
                 
IndexGeneratorFactory.createIndexGenerator("my-index-{local_timestamp|}", 
schema);
         indexGenerator.open();
-        Assert.assertEquals("my-index-2020_03_18_12_12_14Z", 
indexGenerator.generate(rows.get(0)));
-        Assert.assertEquals("my-index-2020_03_19_12_12_14Z", 
indexGenerator.generate(rows.get(1)));
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-2020_03_17_19_12_14Z");
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020_03_20_03_22_14Z");
     }
 
     @Test
@@ -206,8 +207,8 @@ public class IndexGeneratorFactoryTest extends TestLogger {
         IndexGenerator indexGenerator =
                 IndexGeneratorFactory.createIndexGenerator("index_{item}", 
schema);
         indexGenerator.open();
-        Assert.assertEquals("index_apple", 
indexGenerator.generate(rows.get(0)));
-        Assert.assertEquals("index_peanut", 
indexGenerator.generate(rows.get(1)));
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_apple");
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_peanut");
     }
 
     @Test
@@ -215,8 +216,8 @@ public class IndexGeneratorFactoryTest extends TestLogger {
         IndexGenerator indexGenerator =
                 IndexGeneratorFactory.createIndexGenerator("my-index", schema);
         indexGenerator.open();
-        Assert.assertEquals("my-index", indexGenerator.generate(rows.get(0)));
-        Assert.assertEquals("my-index", indexGenerator.generate(rows.get(1)));
+        assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index");
+        assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index");
     }
 
     @Test
@@ -224,11 +225,12 @@ public class IndexGeneratorFactoryTest extends TestLogger 
{
         String expectedExceptionMsg =
                 "Unknown field 'unknown_ts' in index pattern 
'my-index-{unknown_ts|yyyy-MM-dd}',"
                         + " please check the field name.";
-        try {
-            
IndexGeneratorFactory.createIndexGenerator("my-index-{unknown_ts|yyyy-MM-dd}", 
schema);
-        } catch (TableException e) {
-            Assert.assertEquals(e.getMessage(), expectedExceptionMsg);
-        }
+        assertThatThrownBy(
+                        () ->
+                                IndexGeneratorFactory.createIndexGenerator(
+                                        "my-index-{unknown_ts|yyyy-MM-dd}", 
schema))
+                .isInstanceOf(TableException.class)
+                .hasMessage(expectedExceptionMsg);
     }
 
     @Test
@@ -236,11 +238,12 @@ public class IndexGeneratorFactoryTest extends TestLogger 
{
         String expectedExceptionMsg =
                 "Unsupported type 'INT' found in Elasticsearch dynamic index 
field, "
                         + "time-related pattern only support types are: 
DATE,TIME,TIMESTAMP.";
-        try {
-            
IndexGeneratorFactory.createIndexGenerator("my-index-{id|yyyy-MM-dd}", schema);
-        } catch (TableException e) {
-            Assert.assertEquals(expectedExceptionMsg, e.getMessage());
-        }
+        assertThatThrownBy(
+                        () ->
+                                IndexGeneratorFactory.createIndexGenerator(
+                                        "my-index-{id|yyyy-MM-dd}", schema))
+                .isInstanceOf(TableException.class)
+                .hasMessage(expectedExceptionMsg);
     }
 
     @Test
@@ -248,23 +251,12 @@ public class IndexGeneratorFactoryTest extends TestLogger 
{
         String expectedExceptionMsg =
                 "Chaining dynamic index pattern 
my-index-{local_date}-{local_time} is not supported,"
                         + " only support single dynamic index pattern.";
-        try {
-            IndexGeneratorFactory.createIndexGenerator(
-                    "my-index-{local_date}-{local_time}", schema);
-        } catch (TableException e) {
-            Assert.assertEquals(expectedExceptionMsg, e.getMessage());
-        }
-    }
-
-    @Test
-    public void testDynamicIndexUnsupportedFormat() {
-        String expectedExceptionMsg = "Unsupported field: HourOfDay";
-        try {
-            IndexGeneratorFactory.createIndexGenerator(
-                    "my-index-{log_date|yyyy/MM/dd HH:mm}", schema);
-        } catch (UnsupportedTemporalTypeException e) {
-            Assert.assertEquals(expectedExceptionMsg, e.getMessage());
-        }
+        assertThatThrownBy(
+                        () ->
+                                IndexGeneratorFactory.createIndexGenerator(
+                                        "my-index-{local_date}-{local_time}", 
schema))
+                .isInstanceOf(TableException.class)
+                .hasMessage(expectedExceptionMsg);
     }
 
     @Test
@@ -273,10 +265,9 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                 "Unsupported type BOOLEAN of index field, Supported types are:"
                         + " [DATE, TIME_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE,"
                         + " TIMESTAMP_WITH_LOCAL_TIME_ZONE, VARCHAR, CHAR, 
TINYINT, INTEGER, BIGINT]";
-        try {
-            IndexGeneratorFactory.createIndexGenerator("index_{status}", 
schema);
-        } catch (IllegalArgumentException e) {
-            Assert.assertEquals(expectedExceptionMsg, e.getMessage());
-        }
+        assertThatThrownBy(
+                        () -> 
IndexGeneratorFactory.createIndexGenerator("index_{status}", schema))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(expectedExceptionMsg);
     }
 }
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
index 584ed4d..33023a7 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
@@ -30,13 +30,14 @@ import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
-import org.junit.Assert;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * This class contains utilities and a pre-defined source function and 
Elasticsearch Sink function
  * used to simulate and verify data used in tests.
@@ -144,7 +145,7 @@ public class SourceSinkDataTestKit {
                     client.get(
                             new GetRequest(index, TYPE_NAME, 
Integer.toString(i)),
                             RequestOptions.DEFAULT);
-            Assert.assertEquals(DATA_PREFIX + i, 
response.getSource().get(DATA_FIELD_NAME));
+            
assertThat(response.getSource().get(DATA_FIELD_NAME)).isEqualTo(DATA_PREFIX + 
i);
         }
     }
 
diff --git 
a/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
 
b/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
index d8053d4..502ad72 100644
--- 
a/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
+++ 
b/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.connector.elasticsearch.table;
 
+import org.apache.flink.table.api.ValidationException;
+
 import org.junit.jupiter.api.Test;
 
 import static 
org.apache.flink.connector.elasticsearch.table.TestContext.context;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for validation in {@link Elasticsearch6DynamicSinkFactory}. */
 public class Elasticsearch6DynamicSinkFactoryTest extends 
ElasticsearchDynamicSinkFactoryBaseTest {
@@ -42,14 +45,15 @@ public class Elasticsearch6DynamicSinkFactoryTest extends 
ElasticsearchDynamicSi
     public void validateEmptyConfiguration() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
 
-        assertValidationException(
-                "One or more required options are missing.\n"
-                        + "\n"
-                        + "Missing required options are:\n"
-                        + "\n"
-                        + "document-type\n"
-                        + "hosts\n"
-                        + "index",
-                () -> sinkFactory.createDynamicTableSink(context().build()));
+        assertThatThrownBy(() -> 
sinkFactory.createDynamicTableSink(context().build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "One or more required options are missing.\n"
+                                + "\n"
+                                + "Missing required options are:\n"
+                                + "\n"
+                                + "document-type\n"
+                                + "hosts\n"
+                                + "index");
     }
 }
diff --git 
a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
 
b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
index e824772..26c0df1 100644
--- 
a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
+++ 
b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.connector.elasticsearch.table;
 
+import org.apache.flink.table.api.ValidationException;
+
 import org.junit.jupiter.api.Test;
 
 import static 
org.apache.flink.connector.elasticsearch.table.TestContext.context;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for validation in {@link Elasticsearch7DynamicSinkFactory}. */
 public class Elasticsearch7DynamicSinkFactoryTest extends 
ElasticsearchDynamicSinkFactoryBaseTest {
@@ -41,13 +44,14 @@ public class Elasticsearch7DynamicSinkFactoryTest extends 
ElasticsearchDynamicSi
     public void validateEmptyConfiguration() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
 
-        assertValidationException(
-                "One or more required options are missing.\n"
-                        + "\n"
-                        + "Missing required options are:\n"
-                        + "\n"
-                        + "hosts\n"
-                        + "index",
-                () -> sinkFactory.createDynamicTableSink(context().build()));
+        assertThatThrownBy(() -> 
sinkFactory.createDynamicTableSink(context().build()))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "One or more required options are missing.\n"
+                                + "\n"
+                                + "Missing required options are:\n"
+                                + "\n"
+                                + "hosts\n"
+                                + "index");
     }
 }

Reply via email to