dannycranmer commented on a change in pull request #18603: URL: https://github.com/apache/flink/pull/18603#discussion_r803703950
########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseConnectorOptions.java ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions; + +/** Options for the Kinesis firehose connector. */ +@PublicEvolving +public class KinesisFirehoseConnectorOptions extends AsyncSinkConnectorOptions { + + public static final ConfigOption<String> DELIVERY_STREAM = + ConfigOptions.key("delivery-stream") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the Kinesis firehose delivery stream backing this table."); Review comment: Capital F for "Firehose" ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseConnectorOptions.java ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions; + +/** Options for the Kinesis firehose connector. */ +@PublicEvolving +public class KinesisFirehoseConnectorOptions extends AsyncSinkConnectorOptions { + + public static final ConfigOption<String> DELIVERY_STREAM = + ConfigOptions.key("delivery-stream") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the Kinesis firehose delivery stream backing this table."); + + public static final ConfigOption<Boolean> SINK_FAIL_ON_ERROR = + ConfigOptions.key("sink.fail-on-error") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional fail on error value for kinesis sink, default is false"); Review comment: typo "kinesis" > "Kinesis Firehose" ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.firehose.model.Record; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.Properties; + +/** Kinesis-firehose-backed {@link AsyncDynamicTableSink}. */ +@Internal +public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> { + + /** Consumed data type of the table. */ + private final DataType consumedDataType; + + /** The Kinesis stream to write to. */ + private final String deliveryStream; + + /** Properties for the Firehose DataStream Sink. */ + private final Properties firehoseClientProperties; + + /** Sink format for encoding records to Kinesis. */ + private final EncodingFormat<SerializationSchema<RowData>> encodingFormat; + + private final Boolean failOnError; + + protected KinesisFirehoseDynamicSink( + @Nullable Integer maxBatchSize, + @Nullable Integer maxInFlightRequests, + @Nullable Integer maxBufferedRequests, + @Nullable Long maxBufferSizeInBytes, + @Nullable Long maxTimeInBufferMS, + @Nullable Boolean failOnError, + @Nullable DataType consumedDataType, + String deliveryStream, + @Nullable Properties firehoseClientProperties, + EncodingFormat<SerializationSchema<RowData>> encodingFormat) { + super( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS); + this.failOnError = failOnError; + this.firehoseClientProperties = firehoseClientProperties; + this.consumedDataType = + Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null"); + this.deliveryStream = + Preconditions.checkNotNull( + deliveryStream, "Firehose Delivery stream name must not be null"); + this.encodingFormat = + Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null"); + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return encodingFormat.getChangelogMode(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + SerializationSchema<RowData> serializationSchema = + encodingFormat.createRuntimeEncoder(context, consumedDataType); + + KinesisFirehoseSinkBuilder<RowData> builder = + KinesisFirehoseSink.<RowData>builder() + .setSerializationSchema(serializationSchema) + .setFirehoseClientProperties(firehoseClientProperties) + .setDeliveryStreamName(deliveryStream); + + Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError); + super.addAsyncOptionsToSinkBuilder(builder); + + KinesisFirehoseSink<RowData> kdsSink = builder.build(); + return SinkProvider.of(kdsSink); + } + + @Override + public DynamicTableSink copy() { + return new KinesisFirehoseDynamicSink( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS, + failOnError, + consumedDataType, + deliveryStream, + firehoseClientProperties, + encodingFormat); + } + + @Override + public String asSummaryString() { + return "firehose"; Review comment: Where is this rendered? I wonder if we should extend this? ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.table.util.KinesisAsyncClientOptionsUtils; +import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator; +import org.apache.flink.connector.base.table.sink.options.SinkConnectorOptionsUtils; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.DELIVERY_STREAM; +import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.SINK_FAIL_ON_ERROR; + +/** Class for extracting firehose configurations from table options. */ +@Internal +public class KinesisFirehoseConnectorOptionUtils implements SinkConnectorOptionsUtils { + + public static final String KINESIS_CLIENT_PROPERTIES_KEY = "sink.client.properties"; + + private final AsyncSinkConfigurationValidator asyncSinkConfigurationValidator; + private final KinesisAsyncClientOptionsUtils kinesisClientOptionsUtils; Review comment: Rename this to something for Firehose or more general than Kinesis ########## File path: flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml ########## @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-connectors</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId> + <name>Flink : Connectors : SQL : AWS Kinesis Data Firehose</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-kinesis-firehose</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes> + <include>org.apache.flink:flink-table-common</include> + <include>org.apache.flink:flink-streaming-java</include> + <include>org.apache.flink:flink-connector-base</include> + <include>org.apache.flink:flink-connector-aws-base</include> + <include>org.apache.flink:flink-connector-aws-kinesis-firehose</include> + <include>software.amazon.awssdk:*</include> + <include>io.netty:*</include> + <include>org.reactivestreams:*</include> + <include>org.apache.httpcomponents:*</include> + <include>com.typesafe.netty:*</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>software.amazon</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.software.amazon</shadedPattern> + </relocation> + <relocation> + <pattern>io.netty</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.io.netty</shadedPattern> + </relocation> + <relocation> + <pattern>org.reactivestreams</pattern> + <shadedPattern>org.apache.flink.kinesis-firehose.shaded.org.reactivestreams</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.http</pattern> + <shadedPattern>org.apache.flink.kinesis-streams.shaded.org.apache.http</shadedPattern> + </relocation> Review comment: Bad indentation ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.firehose.model.Record; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.Properties; + +/** Kinesis-firehose-backed {@link AsyncDynamicTableSink}. */ +@Internal +public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> { + + /** Consumed data type of the table. */ + private final DataType consumedDataType; + + /** The Kinesis stream to write to. */ + private final String deliveryStream; + + /** Properties for the Firehose DataStream Sink. */ + private final Properties firehoseClientProperties; + + /** Sink format for encoding records to Kinesis. */ + private final EncodingFormat<SerializationSchema<RowData>> encodingFormat; + + private final Boolean failOnError; Review comment: Ah I see it defaults in the Builder (KinesisFirehoseSinkBuilder) ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.table.util.KinesisAsyncClientOptionsUtils; +import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator; +import org.apache.flink.connector.base.table.sink.options.SinkConnectorOptionsUtils; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.DELIVERY_STREAM; +import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.SINK_FAIL_ON_ERROR; + +/** Class for extracting firehose configurations from table options. */ +@Internal +public class KinesisFirehoseConnectorOptionUtils implements SinkConnectorOptionsUtils { + + public static final String KINESIS_CLIENT_PROPERTIES_KEY = "sink.client.properties"; Review comment: `KINESIS_CLIENT_PROPERTIES_KEY` should be FIREHOSE? ########## File path: flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/KinesisAsyncClientOptionsUtilsTest.java ########## @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.kinesis.table.util; +package org.apache.flink.connector.aws.table.util; Review comment: We should rename this class to make it more general ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-kinesis-firehose/pom.xml ########## @@ -0,0 +1,151 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + + <artifactId>flink-end-to-end-tests-kinesis-firehose</artifactId> Review comment: This module naming is inconsistent with the source. Please add rename for consistency `flink-end-to-end-tests-aws-kinesis-firehose`. Can you also rename the Kinesis one for consistency in a separate commit/PR? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/table/sink/options/SinkConnectorOptionsUtils.java ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.table.sink.options; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.table.options.TableOptionsUtils; + +import java.util.Properties; + +/** Interface for classes handling table options for sink connectors. */ +@Internal +public interface SinkConnectorOptionsUtils extends TableOptionsUtils { + + /** + * Method for extracting and wrapping sink specific table options. + * + * @return Sink specific configuration. + */ + Properties getSinkProperties(); Review comment: I am not sure this is really Sink specific... It is just returning some properties, and it is not used by the KDS Async Sink ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFireHoseTableITTest.java ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.tests.util.flink.container.FlinkContainers; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; +import software.amazon.awssdk.services.iam.IamAsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIamClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient; + +/** End to End test for Kinesis Firehose Table sink API. */ +public class KinesisFireHoseTableITTest extends TestLogger { Review comment: `KinesisFireHoseTableITTest` > `KinesisFirehoseTableITTest` ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory; +import org.apache.flink.connector.firehose.table.util.KinesisFirehoseConnectorOptionUtils; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.DELIVERY_STREAM; +import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.SINK_FAIL_ON_ERROR; +import static org.apache.flink.connector.firehose.table.util.KinesisFirehoseConnectorOptionUtils.KINESIS_CLIENT_PROPERTIES_KEY; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; + +/** Factory for creating {@link KinesisFirehoseDynamicSink} . */ +@Internal +public class KinesisFirehoseDynamicTableFactory extends AsyncDynamicTableSinkFactory { + + public static final String IDENTIFIER = "firehose"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + ReadableConfig tableOptions = helper.getOptions(); + ResolvedCatalogTable catalogTable = context.getCatalogTable(); + DataType physicalDataType = catalogTable.getResolvedSchema().toPhysicalRowDataType(); + + // initialize the table format early in order to register its consumedOptionKeys + // in the TableFactoryHelper, as those are needed for correct option validation + EncodingFormat<SerializationSchema<RowData>> encodingFormat = + helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT); Review comment: This looks like boiler plate, can it be pulled up to `AsyncDynamicTableSinkFactory`? ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.firehose.model.Record; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.Properties; + +/** Kinesis-firehose-backed {@link AsyncDynamicTableSink}. */ +@Internal +public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> { + + /** Consumed data type of the table. */ + private final DataType consumedDataType; + + /** The Kinesis stream to write to. */ Review comment: Kinesis Firehose Delivery Stream ########## File path: flink-connectors/flink-sql-connector-aws-kinesis-firehose/src/main/resources/META-INF/NOTICE ########## @@ -0,0 +1,53 @@ +flink-sql-connector-aws-kinesis-data-firehose + +Copyright 2014-2021 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- software.amazon.awssdk:kinesis:2.17.52 Review comment: Do we really include kinesis? Why? I will review this NOTICE again later ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.firehose.model.Record; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.Properties; + +/** Kinesis-firehose-backed {@link AsyncDynamicTableSink}. */ +@Internal +public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> { + + /** Consumed data type of the table. */ + private final DataType consumedDataType; + + /** The Kinesis stream to write to. */ + private final String deliveryStream; + + /** Properties for the Firehose DataStream Sink. */ + private final Properties firehoseClientProperties; + + /** Sink format for encoding records to Kinesis. */ + private final EncodingFormat<SerializationSchema<RowData>> encodingFormat; + + private final Boolean failOnError; + + protected KinesisFirehoseDynamicSink( + @Nullable Integer maxBatchSize, + @Nullable Integer maxInFlightRequests, + @Nullable Integer maxBufferedRequests, + @Nullable Long maxBufferSizeInBytes, + @Nullable Long maxTimeInBufferMS, + @Nullable Boolean failOnError, + @Nullable DataType consumedDataType, + String deliveryStream, + @Nullable Properties firehoseClientProperties, + EncodingFormat<SerializationSchema<RowData>> encodingFormat) { + super( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS); + this.failOnError = failOnError; + this.firehoseClientProperties = firehoseClientProperties; + this.consumedDataType = + Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null"); + this.deliveryStream = + Preconditions.checkNotNull( + deliveryStream, "Firehose Delivery stream name must not be null"); + this.encodingFormat = + Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null"); + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return encodingFormat.getChangelogMode(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + SerializationSchema<RowData> serializationSchema = + encodingFormat.createRuntimeEncoder(context, consumedDataType); + + KinesisFirehoseSinkBuilder<RowData> builder = + KinesisFirehoseSink.<RowData>builder() + .setSerializationSchema(serializationSchema) + .setFirehoseClientProperties(firehoseClientProperties) + .setDeliveryStreamName(deliveryStream); + + Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError); + super.addAsyncOptionsToSinkBuilder(builder); + + KinesisFirehoseSink<RowData> kdsSink = builder.build(); + return SinkProvider.of(kdsSink); Review comment: nit: `return SinkProvider.of(builder.build());` ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/table/sink/options/SinkConnectorOptionsUtils.java ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.table.sink.options; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.table.options.TableOptionsUtils; + +import java.util.Properties; + +/** Interface for classes handling table options for sink connectors. */ +@Internal +public interface SinkConnectorOptionsUtils extends TableOptionsUtils { Review comment: We already have an `AWSOptionUtils` why are we not using that for Firehose? ########## File path: flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml ########## @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-connectors</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId> + <name>Flink : Connectors : SQL : AWS Kinesis Data Firehose</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-kinesis-firehose</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes> + <include>org.apache.flink:flink-table-common</include> + <include>org.apache.flink:flink-streaming-java</include> Review comment: I assume this is a no-op because it is a provided dependency, but please clean out. Also please update the Kinesis one as I see the same thing ########## File path: flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/KinesisAsyncClientOptionsUtils.java ########## @@ -16,11 +16,10 @@ * limitations under the License. */ -package org.apache.flink.connector.kinesis.table.util; +package org.apache.flink.connector.aws.table.util; Review comment: We should rename this class to make it more general ########## File path: flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml ########## @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-connectors</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId> + <name>Flink : Connectors : SQL : AWS Kinesis Data Firehose</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-kinesis-firehose</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes> + <include>org.apache.flink:flink-table-common</include> + <include>org.apache.flink:flink-streaming-java</include> Review comment: We should not be shading `flink-streaming-java` ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFireHoseTableITTest.java ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.tests.util.flink.container.FlinkContainers; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; +import software.amazon.awssdk.services.iam.IamAsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIamClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient; + +/** End to End test for Kinesis Firehose Table sink API. */ +public class KinesisFireHoseTableITTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisFireHoseTableITTest.class); + + private static final String ROLE_NAME = "super-role"; + private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME; + private static final String BUCKET_NAME = "s3-firehose"; + private static final String STREAM_NAME = "s3-stream"; + + private final Path sqlConnectorFirehoseJar = TestUtils.getResource(".*firehose.jar"); + + private S3AsyncClient s3AsyncClient; + private FirehoseAsyncClient firehoseAsyncClient; + private IamAsyncClient iamAsyncClient; + + private static final Network network = Network.newNetwork(); + + @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); + + @ClassRule + public static LocalstackContainer mockFirehoseContainer = + new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) + .withNetwork(network) + .withNetworkAliases("localstack"); + + public static final FlinkContainers FLINK = + FlinkContainers.builder() + .setEnvironmentVariable("AWS_CBOR_DISABLE", "1") + .setEnvironmentVariable( + "FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") + .setNetwork(network) + .setLogger(LOG) + .dependsOn(mockFirehoseContainer) + .build(); + + @Before + public void setup() throws Exception { + SdkAsyncHttpClient httpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), httpClient); + firehoseAsyncClient = getFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient); + iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), httpClient); + System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); + LOG.info("1 - Creating the bucket for Firehose to deliver into..."); + createBucket(s3AsyncClient, BUCKET_NAME); + LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 bucket..."); + createIAMRole(iamAsyncClient, ROLE_NAME); + LOG.info("3 - Creating the Firehose delivery stream..."); + createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, firehoseAsyncClient); + LOG.info("Done setting up the localstack."); Review comment: nit: Some newlines would make this more readable ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table.util; Review comment: This package is inconsistent with the equivalent Kinesis one. Can you make them consistent? ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.firehose.model.Record; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.Properties; + +/** Kinesis-firehose-backed {@link AsyncDynamicTableSink}. */ +@Internal +public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> { + + /** Consumed data type of the table. */ + private final DataType consumedDataType; + + /** The Kinesis stream to write to. */ + private final String deliveryStream; + + /** Properties for the Firehose DataStream Sink. */ + private final Properties firehoseClientProperties; + + /** Sink format for encoding records to Kinesis. */ + private final EncodingFormat<SerializationSchema<RowData>> encodingFormat; + + private final Boolean failOnError; Review comment: Why is this `Boolean` and not `boolean` ? Is this field still `@Nullable` here? ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.firehose.model.Record; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.Properties; + +/** Kinesis-firehose-backed {@link AsyncDynamicTableSink}. */ Review comment: nit: remove hyphens ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFireHoseTableITTest.java ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.table.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.tests.util.flink.container.FlinkContainers; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; +import software.amazon.awssdk.services.iam.IamAsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIamClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient; + +/** End to End test for Kinesis Firehose Table sink API. */ +public class KinesisFireHoseTableITTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisFireHoseTableITTest.class); + + private static final String ROLE_NAME = "super-role"; + private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME; + private static final String BUCKET_NAME = "s3-firehose"; + private static final String STREAM_NAME = "s3-stream"; + + private final Path sqlConnectorFirehoseJar = TestUtils.getResource(".*firehose.jar"); + + private S3AsyncClient s3AsyncClient; + private FirehoseAsyncClient firehoseAsyncClient; + private IamAsyncClient iamAsyncClient; + + private static final Network network = Network.newNetwork(); + + @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); + + @ClassRule + public static LocalstackContainer mockFirehoseContainer = + new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) + .withNetwork(network) + .withNetworkAliases("localstack"); + + public static final FlinkContainers FLINK = + FlinkContainers.builder() + .setEnvironmentVariable("AWS_CBOR_DISABLE", "1") + .setEnvironmentVariable( + "FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") + .setNetwork(network) + .setLogger(LOG) + .dependsOn(mockFirehoseContainer) + .build(); + + @Before + public void setup() throws Exception { + SdkAsyncHttpClient httpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), httpClient); + firehoseAsyncClient = getFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient); + iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), httpClient); + System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); + LOG.info("1 - Creating the bucket for Firehose to deliver into..."); + createBucket(s3AsyncClient, BUCKET_NAME); + LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 bucket..."); + createIAMRole(iamAsyncClient, ROLE_NAME); + LOG.info("3 - Creating the Firehose delivery stream..."); + createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, firehoseAsyncClient); + LOG.info("Done setting up the localstack."); + } + + @BeforeClass + public static void setupFlink() throws Exception { + FLINK.start(); + } + + @AfterClass + public static void stopFlink() { + FLINK.stop(); + } + + @After + public void teardown() { + System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); + } + + @Test + public void testTableApiSink() throws Exception { + List<Order> orderList = + ImmutableList.of( + new Order("A", 1), + new Order("B", 2), + new Order("C", 3), + new Order("D", 4), + new Order("E", 5)); + + executeSqlStatements(readSqlFile("send-orders.sql")); + List<Order> orders = readFromS3(); + Assertions.assertThat(orders).containsAll(orderList); + } + + private void executeSqlStatements(final List<String> sqlLines) throws Exception { + FLINK.submitSQLJob( + new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) + .addJars(sqlConnectorFirehoseJar) + .build()); + } + + private List<String> readSqlFile(final String resourceName) throws Exception { + return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI())); + } + + private List<Order> readFromS3() throws Exception { + + Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1)); + List<S3Object> ordersObjects; + List<Order> orders = new ArrayList<>(); + do { + Thread.sleep(1000); + ordersObjects = listBucketObjects(s3AsyncClient, BUCKET_NAME); + ordersObjects.forEach(order -> orders.add(readFromS3(order.key()))); + } while (deadline.hasTimeLeft() && orders.size() < 5); Review comment: nit: Magic number 5 could be extracted to variable and used to construct the test data too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org