dannycranmer commented on a change in pull request #18603: URL: https://github.com/apache/flink/pull/18603#discussion_r804518041
########## File path: flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java ########## @@ -28,51 +28,50 @@ import java.util.Map; import java.util.Properties; -/** Unit tests for {@link KinesisAsyncClientOptionsUtils}. */ -public class KinesisAsyncClientOptionsUtilsTest extends TestLogger { +/** Unit tests for {@link AsyncClientOptionsUtils}. */ +public class AsyncClientOptionsUtilsTest extends TestLogger { @Test public void testGoodKinesisClientOptionsMapping() { Map<String, String> defaultClientOptions = getDefaultClientOptions(); - KinesisAsyncClientOptionsUtils kinesisAsyncClientOptionsUtils = - new KinesisAsyncClientOptionsUtils(defaultClientOptions); + AsyncClientOptionsUtils asyncClientOptionsUtils = + new AsyncClientOptionsUtils(defaultClientOptions); Map<String, String> expectedConfigurations = getDefaultExpectedClientOptions(); Map<String, String> actualConfigurations = - kinesisAsyncClientOptionsUtils.getProcessedResolvedOptions(); + asyncClientOptionsUtils.getProcessedResolvedOptions(); Assertions.assertThat(actualConfigurations).isEqualTo(expectedConfigurations); } @Test public void testGoodKinesisClientOptionsSelectionAndMapping() { Review comment: This test name did not explain the purpose of the test. From reading it I believe that `"sink.not.http-client.some.option"` has been removed from the `getProcessedResolvedOptions` ? Can we make the test name reflect that? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.tests.util.flink.container.FlinkContainers; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; +import software.amazon.awssdk.services.iam.IamAsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIamClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.readObjectsFromS3Bucket; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient; + +/** End to End test for Kinesis Firehose Table sink API. */ +public class KinesisFirehoseTableITTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseTableITTest.class); + + private static final String ROLE_NAME = "super-role"; + private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME; + private static final String BUCKET_NAME = "s3-firehose"; + private static final String STREAM_NAME = "s3-stream"; + + private final Path sqlConnectorFirehoseJar = TestUtils.getResource(".*firehose.jar"); + + private SdkAsyncHttpClient httpClient; + private S3AsyncClient s3AsyncClient; + private FirehoseAsyncClient firehoseAsyncClient; + private IamAsyncClient iamAsyncClient; + + private static final int NUM_ELEMENTS = 5; + private static final Network network = Network.newNetwork(); + + @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); + + @ClassRule + public static LocalstackContainer mockFirehoseContainer = + new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) + .withNetwork(network) + .withNetworkAliases("localstack"); + + public static final FlinkContainers FLINK = + FlinkContainers.builder() + .setEnvironmentVariable("AWS_CBOR_DISABLE", "1") + .setEnvironmentVariable( + "FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") + .setNetwork(network) + .setLogger(LOG) + .dependsOn(mockFirehoseContainer) + .build(); + + @Before + public void setup() throws Exception { + System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); + + // create client needed for clients + httpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + + // creating clients needed for test Review comment: These comments add little value and should be dropped as per the coding standards: https://flink.apache.org/contributing/code-style-and-quality-common.html#comments > // create client needed for clients This seems like a recursive statement ########## File path: flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java ########## @@ -28,51 +28,50 @@ import java.util.Map; import java.util.Properties; -/** Unit tests for {@link KinesisAsyncClientOptionsUtils}. */ -public class KinesisAsyncClientOptionsUtilsTest extends TestLogger { +/** Unit tests for {@link AsyncClientOptionsUtils}. */ +public class AsyncClientOptionsUtilsTest extends TestLogger { @Test public void testGoodKinesisClientOptionsMapping() { Review comment: Naming of methods is now wrong, since it is not kinesis specific ########## File path: flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml ########## @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-connectors</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId> + <name>Flink : Connectors : SQL : AWS Kinesis Data Firehose</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-kinesis-firehose</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes> + <include>org.apache.flink:flink-connector-base</include> + <include>org.apache.flink:flink-connector-aws-base</include> + <include>org.apache.flink:flink-connector-aws-kinesis-firehose</include> + <include>software.amazon.awssdk:*</include> + <include>io.netty:*</include> + <include>org.reactivestreams:*</include> + <include>org.apache.httpcomponents:*</include> + <include>com.typesafe.netty:*</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>software.amazon</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.software.amazon</shadedPattern> + </relocation> + <relocation> + <pattern>io.netty</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.io.netty</shadedPattern> + </relocation> + <relocation> + <pattern>org.reactivestreams</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.org.reactivestreams</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.http</pattern> + <shadedPattern>org.apache.flink.kinesis-streams.shaded.org.apache.http</shadedPattern> + </relocation> + <relocation> + <pattern>com.typesafe.netty</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.com.typesafe.netty</shadedPattern> Review comment: generally not best practice to use hyphens in package names. We would usually match the prefix with the module package path, can you do that? ########## File path: flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml ########## @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-connectors</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId> + <name>Flink : Connectors : SQL : AWS Kinesis Data Firehose</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-kinesis-firehose</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes> + <include>org.apache.flink:flink-connector-base</include> + <include>org.apache.flink:flink-connector-aws-base</include> + <include>org.apache.flink:flink-connector-aws-kinesis-firehose</include> + <include>software.amazon.awssdk:*</include> + <include>io.netty:*</include> + <include>org.reactivestreams:*</include> + <include>org.apache.httpcomponents:*</include> + <include>com.typesafe.netty:*</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>software.amazon</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.software.amazon</shadedPattern> + </relocation> + <relocation> + <pattern>io.netty</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.io.netty</shadedPattern> + </relocation> + <relocation> + <pattern>org.reactivestreams</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.org.reactivestreams</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.http</pattern> + <shadedPattern>org.apache.flink.kinesis-streams.shaded.org.apache.http</shadedPattern> + </relocation> + <relocation> + <pattern>com.typesafe.netty</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.com.typesafe.netty</shadedPattern> Review comment: I see the same is true in the kinesis SQL jar. Can we update that, or create a Jira please? ########## File path: flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java ########## @@ -28,51 +28,50 @@ import java.util.Map; import java.util.Properties; -/** Unit tests for {@link KinesisAsyncClientOptionsUtils}. */ -public class KinesisAsyncClientOptionsUtilsTest extends TestLogger { +/** Unit tests for {@link AsyncClientOptionsUtils}. */ +public class AsyncClientOptionsUtilsTest extends TestLogger { Review comment: nit: Add whitespace in the tests to make more readable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org