This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit a1b4da53f4a73eaa22e5dbb4d22e0ed897ec4f0a Author: Joao Boto <b...@boto.pro> AuthorDate: Sun May 26 02:45:25 2019 +0200 [BAHIR-200] Move tests from docker to kudu-test-utils (#49) --- flink-connector-kudu/dockers/docker-compose.yml | 92 ---------------------- flink-connector-kudu/dockers/role/Dockerfile | 41 ---------- .../dockers/role/docker-entrypoint.sh | 69 ---------------- flink-connector-kudu/dockers/run_kudu_tests.sh | 68 ---------------- flink-connector-kudu/dockers/start-images.sh | 42 ---------- flink-connector-kudu/dockers/stop-images.sh | 33 -------- flink-connector-kudu/pom.xml | 42 +++++++--- .../connectors/kudu/connector/KuduFilterInfo.java | 4 +- .../streaming/connectors/kudu/DockerTest.java | 31 -------- .../connectors/kudu/KuduInputFormatTest.java | 8 +- .../connectors/kudu/KuduOuputFormatTest.java | 13 +-- .../streaming/connectors/kudu/KuduSinkTest.java | 29 +++++-- .../connectors/kudu/connector/KuduDatabase.java | 16 +++- 13 files changed, 82 insertions(+), 406 deletions(-) diff --git a/flink-connector-kudu/dockers/docker-compose.yml b/flink-connector-kudu/dockers/docker-compose.yml deleted file mode 100644 index d2c95bb..0000000 --- a/flink-connector-kudu/dockers/docker-compose.yml +++ /dev/null @@ -1,92 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -version: '2' - -services: - - kudu-master: - image: eskabetxe/kudu - container_name: kudu-master - hostname: 172.25.0.6 - ports: - - "8051:8051" - volumes: - - /var/lib/kudu/master - command: master - networks: - mynet: - ipv4_address: 172.25.0.6 - - kudu-server1: - image: eskabetxe/kudu - container_name: kudu-server1 - hostname: 172.25.0.7 - environment: - - KUDU_MASTER=172.25.0.6 - ports: - - "8054:8050" - volumes: - - /var/lib/kudu/server - command: tserver - networks: - mynet: - ipv4_address: 172.25.0.7 - links: - - kudu-master - - kudu-server2: - image: eskabetxe/kudu - container_name: kudu-server2 - hostname: 172.25.0.8 - environment: - - KUDU_MASTER=172.25.0.6 - ports: - - "8052:8050" - volumes: - - /var/lib/kudu/server - command: tserver - networks: - mynet: - ipv4_address: 172.25.0.8 - links: - - kudu-master - - kudu-server3: - image: eskabetxe/kudu - container_name: kudu-server3 - hostname: 172.25.0.9 - environment: - - KUDU_MASTER=172.25.0.6 - ports: - - "8053:8050" - volumes: - - /var/lib/kudu/server - command: tserver - networks: - mynet: - ipv4_address: 172.25.0.9 - links: - - kudu-master - -networks: - mynet: - driver: bridge - ipam: - config: - - subnet: 172.25.0.0/24 - IPRange: 172.25.0.2/24, - gateway: 172.25.0.1 diff --git a/flink-connector-kudu/dockers/role/Dockerfile b/flink-connector-kudu/dockers/role/Dockerfile deleted file mode 100644 index b14b087..0000000 --- a/flink-connector-kudu/dockers/role/Dockerfile +++ /dev/null @@ -1,41 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -FROM bitnami/minideb:jessie -MAINTAINER eskabetxe - -RUN set -x \ - && apt-get update \ - && apt-get install -y --no-install-recommends \ - bzip2 unzip xz-utils wget \ - && cd /etc/apt/sources.list.d \ - && wget -qO - http://archive.cloudera.com/kudu/debian/jessie/amd64/kudu/archive.key | apt-key add - \ - && wget http://archive.cloudera.com/kudu/debian/jessie/amd64/kudu/cloudera.list \ - && apt-get update \ - && apt-get install --no-install-recommends -y \ - kudu kudu-master kudu-tserver libkuduclient0 libkuduclient-dev \ - && rm -rf /var/lib/apt/lists/* \ - && apt-get autoclean - -VOLUME /var/lib/kudu/master /var/lib/kudu/tserver - -COPY docker-entrypoint.sh / -RUN chmod a+x /docker-entrypoint.sh - -ENTRYPOINT ["/docker-entrypoint.sh"] -EXPOSE 8050 8051 7050 7051 -#CMD ["help"] diff --git a/flink-connector-kudu/dockers/role/docker-entrypoint.sh b/flink-connector-kudu/dockers/role/docker-entrypoint.sh deleted file mode 100644 index 770850c..0000000 --- a/flink-connector-kudu/dockers/role/docker-entrypoint.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -set -e - -function do_help { - echo HELP: - echo "Supported commands:" - echo " master - Start a Kudu Master" - echo " tserver - Start a Kudu TServer" - echo " single - Start a Kudu Master+TServer in one container" - echo " kudu - Run the Kudu CLI" - echo " help - print useful information and exit"l - echo "" - echo "Other commands can be specified to run shell commands." - echo "Set the environment variable KUDU_OPTS to pass additional" - echo "arguments to the kudu process. DEFAULT_KUDU_OPTS contains" - echo "a recommended base set of options." - - exit 0 -} - -DEFAULT_KUDU_OPTS="-logtostderr \ - -fs_wal_dir=/var/lib/kudu/$1 \ - -fs_data_dirs=/var/lib/kudu/$1 \ - -use_hybrid_clock=false" - -KUDU_OPTS=${KUDU_OPTS:-${DEFAULT_KUDU_OPTS}} - -if [ "$1" = 'master' ]; then - exec kudu-master -fs_wal_dir /var/lib/kudu/master ${KUDU_OPTS} -elif [ "$1" = 'tserver' ]; then - exec kudu-tserver -fs_wal_dir /var/lib/kudu/tserver \ - -tserver_master_addrs ${KUDU_MASTER} ${KUDU_OPTS} -elif [ "$1" = 'single' ]; then - KUDU_MASTER=boot2docker - KUDU_MASTER_OPTS="-logtostderr \ - -fs_wal_dir=/var/lib/kudu/master \ - -fs_data_dirs=/var/lib/kudu/master \ - -use_hybrid_clock=false" - KUDU_TSERVER_OPTS="-logtostderr \ - -fs_wal_dir=/var/lib/kudu/tserver \ - -fs_data_dirs=/var/lib/kudu/tserver \ - -use_hybrid_clock=false" - exec kudu-master -fs_wal_dir /var/lib/kudu/master ${KUDU_MASTER_OPTS} & - sleep 5 - exec kudu-tserver -fs_wal_dir /var/lib/kudu/tserver -tserver_master_addrs ${KUDU_MASTER} ${KUDU_TSERVER_OPTS} -elif [ "$1" = 'kudu' ]; then - shift; # Remove first arg and pass remainder to kudu cli - exec kudu "$@" -elif [ "$1" = 'help' ]; then - do_help -fi - -exec "$@" \ No newline at end of file diff --git a/flink-connector-kudu/dockers/run_kudu_tests.sh b/flink-connector-kudu/dockers/run_kudu_tests.sh deleted file mode 100755 index 58593d6..0000000 --- a/flink-connector-kudu/dockers/run_kudu_tests.sh +++ /dev/null @@ -1,68 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# Runs all tests with Kudu server in docker containers. - -set -euo pipefail -x - -# http://stackoverflow.com/questions/3572030/bash-script-absolute-path-with-osx -function absolutepath() { - [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" -} - -SCRIPT_DIR=$(dirname $(absolutepath "$0")) - -PROJECT_ROOT="${SCRIPT_DIR}/../.." - -DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml" - - -function build_image() { - docker build -t eskabetxe/kudu ${SCRIPT_DIR}/role - - #docker-compose build -f "${DOCKER_COMPOSE_LOCATION}" -} - -function start_docker_container() { - # stop already running containers - #docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down || true - - # start containers - docker-compose -f "${DOCKER_COMPOSE_LOCATION}" up -d -} - -function cleanup_docker_container() { - docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down - #true -} - -build_image - -start_docker_container - -#run product tests -pushd ${PROJECT_ROOT} -set +e -mvn test -pl flink-connector-kudu -P test-kudu -EXIT_CODE=$? -set -e -popd - -cleanup_docker_container - -exit ${EXIT_CODE} \ No newline at end of file diff --git a/flink-connector-kudu/dockers/start-images.sh b/flink-connector-kudu/dockers/start-images.sh deleted file mode 100755 index fad3de6..0000000 --- a/flink-connector-kudu/dockers/start-images.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -function absolutepath() { - [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" -} - -function build_image() { - docker build -t eskabetxe/kudu ${SCRIPT_DIR}/role - - #docker-compose build -f "${DOCKER_COMPOSE_LOCATION}" -} - -function start_docker_container() { - # stop already running containers - docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down || true - - # start containers - docker-compose -f "${DOCKER_COMPOSE_LOCATION}" up -d -} - -SCRIPT_DIR=$(dirname $(absolutepath "$0")) -DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml" - -build_image - -start_docker_container diff --git a/flink-connector-kudu/dockers/stop-images.sh b/flink-connector-kudu/dockers/stop-images.sh deleted file mode 100755 index 9ae52c1..0000000 --- a/flink-connector-kudu/dockers/stop-images.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -function absolutepath() { - [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" -} - -function cleanup_docker_container() { - docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down - - docker rm eskabetxe/kudu -} - -SCRIPT_DIR=$(dirname $(absolutepath "$0")) -DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml" - -cleanup_docker_container - diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index d51341b..a504b89 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -32,6 +32,7 @@ <properties> <kudu.version>1.9.0</kudu.version> + <mockito.version>1.10.19</mockito.version> <junit.groups>!DockerTest</junit.groups> </properties> @@ -50,24 +51,43 @@ </dependency> <!--test dependencies--> - <dependency> <groupId>org.apache.kudu</groupId> - <artifactId>kudu-client</artifactId> + <artifactId>kudu-test-utils</artifactId> <version>${kudu.version}</version> - <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.kudu</groupId> + <artifactId>kudu-binary</artifactId> + <version>${kudu.version}</version> + <classifier>${os.detected.classifier}</classifier> + <scope>test</scope> + </dependency> + <!-- this is added because test cluster use @Rule from junit4 --> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-migrationsupport</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> </dependencies> - <profiles> - <profile> - <id>docker-test</id> - <properties> - <junit.groups>DockerTest</junit.groups> - </properties> - </profile> - </profiles> + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.6.2</version> + </extension> + </extensions> + </build> </project> diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java index bd20fc8..1a7582d 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java @@ -38,7 +38,7 @@ public class KuduFilterInfo { KuduPredicate predicate; switch (this.type) { case IS_IN: - predicate = KuduPredicate.newInListPredicate(column, (List) this.value); + predicate = KuduPredicate.newInListPredicate(column, (List<?>) this.value); break; case IS_NULL: predicate = KuduPredicate.newIsNullPredicate(column); @@ -137,7 +137,7 @@ public class KuduFilterInfo { return filter(FilterType.IS_NULL, null); } - public Builder isIn(List values) { + public Builder isIn(List<?> values) { return filter(FilterType.IS_IN, values); } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java deleted file mode 100644 index 070e634..0000000 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/DockerTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kudu; - -import org.junit.jupiter.api.Tag; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Target({ ElementType.TYPE, ElementType.METHOD }) -@Retention(RetentionPolicy.RUNTIME) -@Tag("DockerTest") -public @interface DockerTest { -} - diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java index eb9dc00..041b77e 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -@DockerTest + public class KuduInputFormatTest extends KuduDatabase { @Test @@ -37,7 +37,8 @@ public class KuduInputFormatTest extends KuduDatabase { @Test public void testInvalidTableInfo() throws IOException { - Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(hostsCluster, null)); + String masterAddresses = harness.getMasterAddressesAsString(); + Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(masterAddresses, null)); } @Test @@ -68,7 +69,8 @@ public class KuduInputFormatTest extends KuduDatabase { public static List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception { - KuduInputFormat inputFormat = new KuduInputFormat(hostsCluster, tableInfo) + String masterAddresses = harness.getMasterAddressesAsString(); + KuduInputFormat inputFormat = new KuduInputFormat(masterAddresses, tableInfo) .withTableProjections(fieldProjection); KuduInputFormat.KuduInputSplit[] splits = inputFormat.createInputSplits(1); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java index 35982f4..b9aaa40 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.List; import java.util.UUID; -@DockerTest public class KuduOuputFormatTest extends KuduDatabase { @Test @@ -38,21 +37,24 @@ public class KuduOuputFormatTest extends KuduDatabase { @Test public void testInvalidTableInfo() throws IOException { - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null, new DefaultSerDe())); + String masterAddresses = harness.getMasterAddressesAsString(); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(masterAddresses, null, new DefaultSerDe())); } @Test public void testNotTableExist() throws IOException { + String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe()); + KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()); Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1)); } @Test public void testOutputWithStrongConsistency() throws Exception { + String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe()) + KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()) .withStrongConsistency(); outputFormat.open(0,1); @@ -69,9 +71,10 @@ public class KuduOuputFormatTest extends KuduDatabase { @Test public void testOutputWithEventualConsistency() throws Exception { + String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo, new DefaultSerDe()) + KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe()) .withEventualConsistency(); outputFormat.open(0,1); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java index 3ca9b9a..83e060d 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java @@ -17,20 +17,31 @@ package org.apache.flink.streaming.connectors.kudu; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.streaming.connectors.kudu.serde.DefaultSerDe; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.io.IOException; import java.util.List; import java.util.UUID; -@DockerTest + public class KuduSinkTest extends KuduDatabase { + private static StreamingRuntimeContext context; + + @BeforeAll + public static void start() { + context = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(context.isCheckpointingEnabled()).thenReturn(true); + } + @Test public void testInvalidKuduMaster() throws IOException { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); @@ -39,22 +50,27 @@ public class KuduSinkTest extends KuduDatabase { @Test public void testInvalidTableInfo() throws IOException { - Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null, new DefaultSerDe())); + String masterAddresses = harness.getMasterAddressesAsString(); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(masterAddresses, null, new DefaultSerDe())); } @Test public void testNotTableExist() throws IOException { + String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); - KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe()); + KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()); + sink.setRuntimeContext(context); Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration())); } @Test public void testOutputWithStrongConsistency() throws Exception { + String masterAddresses = harness.getMasterAddressesAsString(); KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe()) + KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()) .withStrongConsistency(); + sink.setRuntimeContext(context); sink.open(new Configuration()); for (KuduRow kuduRow : booksDataRow()) { @@ -69,9 +85,12 @@ public class KuduSinkTest extends KuduDatabase { @Test public void testOutputWithEventualConsistency() throws Exception { + String masterAddresses = harness.getMasterAddressesAsString(); + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); - KuduSink sink = new KuduSink<>(hostsCluster, tableInfo, new DefaultSerDe()) + KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe()) .withEventualConsistency(); + sink.setRuntimeContext(context); sink.open(new Configuration()); for (KuduRow kuduRow : booksDataRow()) { diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java index 99efbd1..d22203d 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java @@ -17,15 +17,21 @@ package org.apache.flink.streaming.connectors.kudu.connector; import org.apache.kudu.Type; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Rule; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +@ExtendWith(ExternalResourceSupport.class) public class KuduDatabase { - protected static final String hostsCluster = "172.25.0.6"; + @Rule + public static KuduTestHarness harness = new KuduTestHarness(); protected static final Object[][] booksTableData = { {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11}, @@ -64,7 +70,8 @@ public class KuduDatabase { public void setUpDatabase(KuduTableInfo tableInfo) { try { - KuduConnector tableContext = new KuduConnector(hostsCluster, tableInfo); + String masterAddresses = harness.getMasterAddressesAsString(); + KuduConnector tableContext = new KuduConnector(masterAddresses, tableInfo); booksDataRow().forEach(row -> { try { tableContext.writeRow(row); @@ -77,9 +84,10 @@ public class KuduDatabase { } } - protected static void cleanDatabase(KuduTableInfo tableInfo) { + protected void cleanDatabase(KuduTableInfo tableInfo) { try { - KuduConnector tableContext = new KuduConnector(hostsCluster, tableInfo); + String masterAddresses = harness.getMasterAddressesAsString(); + KuduConnector tableContext = new KuduConnector(masterAddresses, tableInfo); tableContext.deleteTable(); tableContext.close(); } catch (Exception e) {